Skip to content

Commit

Permalink
Migrate CatalogV2Util
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxGekk committed Feb 9, 2024
1 parent c3b049a commit 51cfec7
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 13 deletions.
45 changes: 45 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -7625,6 +7625,51 @@
"The value (<other>) of the type (<otherClass>) cannot be converted to a map type with key type (<keyType>) and value type (<valueType>)"
]
},
"_LEGACY_ERROR_TEMP_3222" : {
"message" : [
"Only literals are allowed in the partition spec, but got <expr>"
]
},
"_LEGACY_ERROR_TEMP_3223" : {
"message" : [
"Cannot find field: <name> in <dataType>"
]
},
"_LEGACY_ERROR_TEMP_3224" : {
"message" : [
"Cannot delete array element"
]
},
"_LEGACY_ERROR_TEMP_3225" : {
"message" : [
"Cannot delete map value"
]
},
"_LEGACY_ERROR_TEMP_3226" : {
"message" : [
"Cannot delete map key"
]
},
"_LEGACY_ERROR_TEMP_3227" : {
"message" : [
"Cannot find field: <fieldName>"
]
},
"_LEGACY_ERROR_TEMP_3228" : {
"message" : [
"AFTER column not found: <afterCol>"
]
},
"_LEGACY_ERROR_TEMP_3229" : {
"message" : [
"Not a struct: <name>"
]
},
"_LEGACY_ERROR_TEMP_3230" : {
"message" : [
"Field not found: <name>"
]
},
"_LEGACY_ERROR_USER_RAISED_EXCEPTION" : {
"message" : [
"<errorMessage>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode}
import org.apache.commons.codec.DecoderException
import org.apache.commons.codec.binary.Hex

import org.apache.spark.{SparkArithmeticException, SparkException}
import org.apache.spark.{SparkArithmeticException, SparkException, SparkIllegalArgumentException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
Expand Down Expand Up @@ -681,8 +681,10 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
Cast(l, StringType, Some(conf.sessionLocalTimeZone)).eval().toString
}
case other =>
throw new IllegalArgumentException(s"Only literals are allowed in the " +
s"partition spec, but got ${other.sql}")
throw new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_3222",
messageParameters = Map("expr" -> other.sql)
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.Collections

import scala.jdk.CollectionConverters._

import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.CurrentUserContext
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
Expand Down Expand Up @@ -163,7 +164,9 @@ private[sql] object CatalogV2Util {
addField(parentType, fieldWithComment, add.position(), tableProvider,
statementType, true)))
case _ =>
throw new IllegalArgumentException(s"Not a struct: ${names.init.last}")
throw new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_3229",
messageParameters = Map("name" -> names.init.last))
})
}

Expand All @@ -188,7 +191,9 @@ private[sql] object CatalogV2Util {
case update: UpdateColumnPosition =>
def updateFieldPos(struct: StructType, name: String): StructType = {
val oldField = struct.fields.find(_.name == name).getOrElse {
throw new IllegalArgumentException("Field not found: " + name)
throw new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_3230",
messageParameters = Map("name" -> name))
}
val withFieldRemoved = StructType(struct.fields.filter(_ != oldField))
addField(withFieldRemoved, oldField, update.position(), tableProvider, statementType,
Expand All @@ -203,7 +208,9 @@ private[sql] object CatalogV2Util {
case parentType: StructType =>
Some(parent.copy(dataType = updateFieldPos(parentType, names.last)))
case _ =>
throw new IllegalArgumentException(s"Not a struct: ${names.init.last}")
throw new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_3229",
messageParameters = Map("name" -> names.init.last))
})
}

Expand Down Expand Up @@ -244,7 +251,9 @@ private[sql] object CatalogV2Util {
val afterCol = position.asInstanceOf[After].column()
val fieldIndex = schema.fields.indexWhere(_.name == afterCol)
if (fieldIndex == -1) {
throw new IllegalArgumentException("AFTER column not found: " + afterCol)
throw new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_3228",
messageParameters = Map("afterCol" -> afterCol))
}
val (before, after) = schema.fields.splitAt(fieldIndex + 1)
StructType(before ++ (field +: after))
Expand All @@ -267,7 +276,9 @@ private[sql] object CatalogV2Util {
// Currently only DROP COLUMN may pass down the IF EXISTS parameter
return struct
} else {
throw new IllegalArgumentException(s"Cannot find field: ${fieldNames.head}")
throw new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_3227",
messageParameters = Map("fieldName" -> fieldNames.head))
}
}

Expand All @@ -283,15 +294,15 @@ private[sql] object CatalogV2Util {

case (Seq("key"), map @ MapType(keyType, _, _)) =>
val updated = update(StructField("key", keyType, nullable = false))
.getOrElse(throw new IllegalArgumentException(s"Cannot delete map key"))
.getOrElse(throw new SparkIllegalArgumentException("_LEGACY_ERROR_TEMP_3226"))
Some(field.copy(dataType = map.copy(keyType = updated.dataType)))

case (Seq("key", names @ _*), map @ MapType(keyStruct: StructType, _, _)) =>
Some(field.copy(dataType = map.copy(keyType = replace(keyStruct, names, update, ifExists))))

case (Seq("value"), map @ MapType(_, mapValueType, isNullable)) =>
val updated = update(StructField("value", mapValueType, nullable = isNullable))
.getOrElse(throw new IllegalArgumentException(s"Cannot delete map value"))
.getOrElse(throw new SparkIllegalArgumentException("_LEGACY_ERROR_TEMP_3225"))
Some(field.copy(dataType = map.copy(
valueType = updated.dataType,
valueContainsNull = updated.nullable)))
Expand All @@ -302,7 +313,7 @@ private[sql] object CatalogV2Util {

case (Seq("element"), array @ ArrayType(elementType, isNullable)) =>
val updated = update(StructField("element", elementType, nullable = isNullable))
.getOrElse(throw new IllegalArgumentException(s"Cannot delete array element"))
.getOrElse(throw new SparkIllegalArgumentException("_LEGACY_ERROR_TEMP_3224"))
Some(field.copy(dataType = array.copy(
elementType = updated.dataType,
containsNull = updated.nullable)))
Expand All @@ -313,8 +324,9 @@ private[sql] object CatalogV2Util {

case (names, dataType) =>
if (!ifExists) {
throw new IllegalArgumentException(
s"Cannot find field: ${names.head} in ${dataType.simpleString}")
throw new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_3223",
messageParameters = Map("name" -> names.head, "dataType" -> dataType.simpleString))
}
None
}
Expand Down

0 comments on commit 51cfec7

Please sign in to comment.