Skip to content

Commit

Permalink
[SPARK-48767][SQL] Fix some error prompts when variant type data is…
Browse files Browse the repository at this point in the history
… invalid

### What changes were proposed in this pull request?
The pr aims to:
- fix some error prompts when `variant` type data is invalid.
- provide a clear `error-condition` for `variant` related errors.
- use `checkError` to check exception in the `VariantSuite` class.

### Why are the changes needed?
- Reproduction examples
  <img width="446" alt="image" src="https://github.com/apache/spark/assets/15246973/840690ea-7378-4fd8-a3b3-4459a66f989b">

  For reproduction examples detail, please refer to: #47162 (comment)

- When there is only `value` or `metadata` in `variant` data, it will throw `variant with more than two field`, which is obviously incorrect.
https://github.com/apache/spark/blob/930422389352b8349e5a845c8cae9993d30dce17/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala#L405-L408

- When `SQLConf.PARQUET_VECTORIZED_READER_ENABLED` is true or false, there is a difference in the error prompt, and we should align it.

### Does this PR introduce _any_ user-facing change?
Yes, fix some error prompts when `variant` type data is invalid.

### How was this patch tested?
Existed UT & Update UT.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #47162 from panbingkun/SPARK-48767.

Authored-by: panbingkun <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
panbingkun authored and cloud-fan committed Jul 5, 2024
1 parent a2f8001 commit 9f22fa4
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 53 deletions.
23 changes: 23 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -2937,6 +2937,29 @@
],
"sqlState" : "22023"
},
"INVALID_VARIANT_FROM_PARQUET" : {
"message" : [
"Invalid variant."
],
"subClass" : {
"MISSING_FIELD" : {
"message" : [
"Missing <field> field."
]
},
"NULLABLE_OR_NOT_BINARY_FIELD" : {
"message" : [
"The <field> must be a non-nullable binary."
]
},
"WRONG_NUM_FIELDS" : {
"message" : [
"Variant column must contain exactly two fields."
]
}
},
"sqlState" : "22023"
},
"INVALID_VARIANT_GET_PATH" : {
"message" : [
"The path `<path>` is not a valid variant extraction path in `<functionName>`.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1921,6 +1921,21 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
messageParameters = Map("col" -> col, "schema" -> schema.catalogString))
}

def invalidVariantMissingFieldError(field: String): Throwable = {
new AnalysisException(errorClass = "INVALID_VARIANT_FROM_PARQUET.MISSING_FIELD",
messageParameters = Map("field" -> field))
}

def invalidVariantNullableOrNotBinaryFieldError(field: String): Throwable = {
new AnalysisException(errorClass = "INVALID_VARIANT_FROM_PARQUET.NULLABLE_OR_NOT_BINARY_FIELD",
messageParameters = Map("field" -> field))
}

def invalidVariantWrongNumFieldsError(): Throwable = {
new AnalysisException(errorClass = "INVALID_VARIANT_FROM_PARQUET.WRONG_NUM_FIELDS",
messageParameters = Map.empty)
}

def parquetTypeUnsupportedYetError(parquetType: String): Throwable = {
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1172",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -857,20 +857,18 @@ private[parquet] class ParquetRowConverter(
private[this] val converters = {
if (parquetType.getFieldCount() != 2) {
// We may allow more than two children in the future, so consider this unsupported.
throw QueryCompilationErrors.
parquetTypeUnsupportedYetError("variant column must contain exactly two fields")
throw QueryCompilationErrors.invalidVariantWrongNumFieldsError()
}
val valueAndMetadata = Seq("value", "metadata").map { colName =>
val idx = (0 until parquetType.getFieldCount())
.find(parquetType.getFieldName(_) == colName)
if (idx.isEmpty) {
throw QueryCompilationErrors.illegalParquetTypeError(s"variant missing $colName field")
throw QueryCompilationErrors.invalidVariantMissingFieldError(colName)
}
val child = parquetType.getType(idx.get)
if (!child.isPrimitive || child.getRepetition != Type.Repetition.REQUIRED ||
child.asPrimitiveType().getPrimitiveTypeName != BINARY) {
throw QueryCompilationErrors.illegalParquetTypeError(
s"variant column must be a non-nullable binary")
throw QueryCompilationErrors.invalidVariantNullableOrNotBinaryFieldError(colName)
}
child
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,23 +404,21 @@ class ParquetToSparkSchemaConverter(
private def convertVariantField(groupColumn: GroupColumnIO): ParquetColumn = {
if (groupColumn.getChildrenCount != 2) {
// We may allow more than two children in the future, so consider this unsupported.
throw QueryCompilationErrors.
parquetTypeUnsupportedYetError("variant with more than two fields")
throw QueryCompilationErrors.invalidVariantWrongNumFieldsError()
}
// Find the binary columns, and validate that they have the correct type.
val valueAndMetadata = Seq("value", "metadata").map { colName =>
val idx = (0 until groupColumn.getChildrenCount)
.find(groupColumn.getChild(_).getName == colName)
if (idx.isEmpty) {
throw QueryCompilationErrors.illegalParquetTypeError(s"variant missing $colName field")
throw QueryCompilationErrors.invalidVariantMissingFieldError(colName)
}
val child = groupColumn.getChild(idx.get)
// The value and metadata cannot be individually null, only the full struct can.
if (child.getType.getRepetition != REQUIRED ||
!child.isInstanceOf[PrimitiveColumnIO] ||
child.asInstanceOf[PrimitiveColumnIO].getPrimitive != BINARY) {
throw QueryCompilationErrors.illegalParquetTypeError(
s"variant $colName must be a non-nullable binary")
throw QueryCompilationErrors.invalidVariantNullableOrNotBinaryFieldError(colName)
}
child
}
Expand Down
140 changes: 97 additions & 43 deletions sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,13 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval
// Partitioning by Variant column is not allowed.
withTempDir { dir =>
val tempDir = new File(dir, "files").getCanonicalPath
intercept[AnalysisException] {
query.write.partitionBy("v").parquet(tempDir)
}
checkError(
exception = intercept[AnalysisException] {
query.write.partitionBy("v").parquet(tempDir)
},
errorClass = "INVALID_PARTITION_COLUMN_DATA_TYPE",
parameters = Map("type" -> "\"VARIANT\"")
)
}

// Same as above, using saveAsTable
Expand All @@ -231,9 +235,13 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval
}

withTable("t") {
intercept[AnalysisException] {
query.write.partitionBy("v").saveAsTable("t")
}
checkError(
exception = intercept[AnalysisException] {
query.write.partitionBy("v").saveAsTable("t")
},
errorClass = "INVALID_PARTITION_COLUMN_DATA_TYPE",
parameters = Map("type" -> "\"VARIANT\"")
)
}

// Same as above, using SQL CTAS
Expand All @@ -243,9 +251,13 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval
}

withTable("t") {
intercept[AnalysisException] {
spark.sql(s"CREATE TABLE t USING PARQUET PARTITIONED BY (v) AS $queryString")
}
checkError(
exception = intercept[AnalysisException] {
spark.sql(s"CREATE TABLE t USING PARQUET PARTITIONED BY (v) AS $queryString")
},
errorClass = "INVALID_PARTITION_COLUMN_DATA_TYPE",
parameters = Map("type" -> "\"VARIANT\"")
)
}
}

Expand All @@ -263,25 +275,36 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval
// Binary value of a literal "false"
val v = "X'8'"
val cases = Seq(
s"named_struct('value', $v, 'metadata', $m, 'paths', $v)",
s"named_struct('value', $v, 'dictionary', $m)",
s"named_struct('val', $v, 'metadata', $m)",
s"named_struct('value', 8, 'metadata', $m)",
s"named_struct('value', cast(null as binary), 'metadata', $m)",
s"named_struct('value', $v, 'metadata', cast(null as binary))"
(s"named_struct('value', $v)",
"INVALID_VARIANT_FROM_PARQUET.WRONG_NUM_FIELDS", Map.empty[String, String]),
(s"named_struct('value', $v, 'metadata', $m, 'paths', $v)",
"INVALID_VARIANT_FROM_PARQUET.WRONG_NUM_FIELDS", Map.empty[String, String]),
(s"named_struct('value', $v, 'dictionary', $m)",
"INVALID_VARIANT_FROM_PARQUET.MISSING_FIELD", Map("field" -> "metadata")),
(s"named_struct('val', $v, 'metadata', $m)",
"INVALID_VARIANT_FROM_PARQUET.MISSING_FIELD", Map("field" -> "value")),
(s"named_struct('value', 8, 'metadata', $m)",
"INVALID_VARIANT_FROM_PARQUET.NULLABLE_OR_NOT_BINARY_FIELD", Map("field" -> "value")),
(s"named_struct('value', cast(null as binary), 'metadata', $m)",
"INVALID_VARIANT_FROM_PARQUET.NULLABLE_OR_NOT_BINARY_FIELD", Map("field" -> "value")),
(s"named_struct('value', $v, 'metadata', cast(null as binary))",
"INVALID_VARIANT_FROM_PARQUET.NULLABLE_OR_NOT_BINARY_FIELD", Map("field" -> "metadata"))
)
cases.foreach { structDef =>
cases.foreach { case (structDef, errorClass, parameters) =>
Seq(false, true).foreach { vectorizedReader =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key ->
vectorizedReader.toString) {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedReader.toString) {
withTempDir { dir =>
val file = new File(dir, "dir").getCanonicalPath
val df = spark.sql(s"select $structDef as v from range(10)")
df.write.parquet(file)
val schema = StructType(Seq(StructField("v", VariantType)))
val result = spark.read.schema(schema).parquet(file).selectExpr("to_json(v)")
val e = intercept[org.apache.spark.SparkException](result.collect())
assert(e.getCause.isInstanceOf[AnalysisException], e.printStackTrace)
checkError(
exception = e.getCause.asInstanceOf[AnalysisException],
errorClass = errorClass,
parameters = parameters
)
}
}
}
Expand All @@ -306,8 +329,7 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval
val df = spark.sql(s"select $structDef as v from range(10)")
df.write.parquet(file)
val schema = StructType(Seq(StructField("v", VariantType)))
val result = spark.read.schema(schema).parquet(file)
.selectExpr("to_json(v)")
val result = spark.read.schema(schema).parquet(file).selectExpr("to_json(v)")
checkAnswer(result, Seq.fill(10)(Row("false")))
}
}
Expand Down Expand Up @@ -396,32 +418,64 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval
}

test("group/order/join variant are disabled") {
var ex = intercept[AnalysisException] {
spark.sql("select parse_json('') group by 1")
}
assert(ex.getErrorClass == "GROUP_EXPRESSION_TYPE_IS_NOT_ORDERABLE")
checkError(
exception = intercept[AnalysisException] {
spark.sql("select parse_json('') group by 1")
},
errorClass = "GROUP_EXPRESSION_TYPE_IS_NOT_ORDERABLE",
parameters = Map("sqlExpr" -> "\"parse_json()\"", "dataType" -> "\"VARIANT\""),
context = ExpectedContext(fragment = "parse_json('')", start = 7, stop = 20)
)

ex = intercept[AnalysisException] {
spark.sql("select parse_json('') order by 1")
}
assert(ex.getErrorClass == "DATATYPE_MISMATCH.INVALID_ORDERING_TYPE")
checkError(
exception = intercept[AnalysisException] {
spark.sql("select parse_json('') order by 1")
},
errorClass = "DATATYPE_MISMATCH.INVALID_ORDERING_TYPE",
parameters = Map(
"functionName" -> "`sortorder`",
"dataType" -> "\"VARIANT\"",
"sqlExpr" -> "\"parse_json() ASC NULLS FIRST\""),
context = ExpectedContext(fragment = "order by 1", start = 22, stop = 31)
)

ex = intercept[AnalysisException] {
spark.sql("select parse_json('') sort by 1")
}
assert(ex.getErrorClass == "DATATYPE_MISMATCH.INVALID_ORDERING_TYPE")
checkError(
exception = intercept[AnalysisException] {
spark.sql("select parse_json('') sort by 1")
},
errorClass = "DATATYPE_MISMATCH.INVALID_ORDERING_TYPE",
parameters = Map(
"functionName" -> "`sortorder`",
"dataType" -> "\"VARIANT\"",
"sqlExpr" -> "\"parse_json() ASC NULLS FIRST\""),
context = ExpectedContext(fragment = "sort by 1", start = 22, stop = 30)
)

ex = intercept[AnalysisException] {
spark.sql("with t as (select 1 as a, parse_json('') as v) " +
"select rank() over (partition by a order by v) from t")
}
assert(ex.getErrorClass == "DATATYPE_MISMATCH.INVALID_ORDERING_TYPE")
checkError(
exception = intercept[AnalysisException] {
spark.sql("with t as (select 1 as a, parse_json('') as v) " +
"select rank() over (partition by a order by v) from t")
},
errorClass = "DATATYPE_MISMATCH.INVALID_ORDERING_TYPE",
parameters = Map(
"functionName" -> "`sortorder`",
"dataType" -> "\"VARIANT\"",
"sqlExpr" -> "\"v ASC NULLS FIRST\""),
context = ExpectedContext(fragment = "v", start = 91, stop = 91)
)

ex = intercept[AnalysisException] {
spark.sql("with t as (select parse_json('') as v) " +
"select t1.v from t as t1 join t as t2 on t1.v = t2.v")
}
assert(ex.getErrorClass == "DATATYPE_MISMATCH.INVALID_ORDERING_TYPE")
checkError(
exception = intercept[AnalysisException] {
spark.sql("with t as (select parse_json('') as v) " +
"select t1.v from t as t1 join t as t2 on t1.v = t2.v")
},
errorClass = "DATATYPE_MISMATCH.INVALID_ORDERING_TYPE",
parameters = Map(
"functionName" -> "`=`",
"dataType" -> "\"VARIANT\"",
"sqlExpr" -> "\"(v = v)\""),
context = ExpectedContext(fragment = "t1.v = t2.v", start = 80, stop = 90)
)
}

test("variant_explode") {
Expand Down

0 comments on commit 9f22fa4

Please sign in to comment.