Skip to content

Commit

Permalink
Require AnalysisException with an error class only
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxGekk committed Dec 9, 2023
1 parent 8e95929 commit 826e051
Show file tree
Hide file tree
Showing 15 changed files with 169 additions and 28 deletions.
83 changes: 83 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -6710,5 +6710,88 @@
"<errorMessage>"
],
"sqlState" : "P0001"
},
"_LEGACY_ERROR_TEMP_3050" : {
"message" : [
"Cannot modify the value of a static config: <k>"
]
},
"_LEGACY_ERROR_TEMP_3051" : {
"message" : [
"When resolving <u>, fail to find subplan with plan_id=<planId> in <q>"
]
},
"_LEGACY_ERROR_TEMP_3052" : {
"message" : [
"Unexpected resolved action: <other>"
]
},
"_LEGACY_ERROR_TEMP_3053" : {
"message" : [
"Unexpected WHEN NOT MATCHED action: <other>"
]
},
"_LEGACY_ERROR_TEMP_3054" : {
"message" : [
"<expr> is not currently supported"
]
},
"_LEGACY_ERROR_TEMP_3055" : {
"message" : [
"ScalarFunction '<scalarFunc.name>' neither implement magic method nor override 'produceResult'"
]
},
"_LEGACY_ERROR_TEMP_3056" : {
"message" : [
"Unexpected row-level read relations (allow multiple = <allowMultipleReads>): <other>"
]
},
"_LEGACY_ERROR_TEMP_3057" : {
"message" : [
"Cannot retrieve row-level operation from <table>"
]
},
"_LEGACY_ERROR_TEMP_3058" : {
"message" : [
"Found duplicate column(s) <checkType>: <duplicateColumns>"
]
},
"_LEGACY_ERROR_TEMP_3059" : {
"message" : [
"The positions provided (<pos>) cannot be resolved in",
"<schema>"
]
},
"_LEGACY_ERROR_TEMP_3060" : {
"message" : [
"Couldn't find column <i> in:",
"<schema>"
]
},
"_LEGACY_ERROR_TEMP_3061" : {
"message" : [
"<e>",
"<schema>"
]
},
"_LEGACY_ERROR_TEMP_3062" : {
"message" : [
"Expected <columnPath> to be a nested data type, but found <o>. Was looking for the index of <attr> in a nested field"
]
},
"_LEGACY_ERROR_TEMP_3063" : {
"message" : [
"pivot is not supported on a streaming DataFrames/Datasets"
]
},
"_LEGACY_ERROR_TEMP_3064" : {
"message" : [
"<msg>"
]
},
"_LEGACY_ERROR_TEMP_3065" : {
"message" : [
"<clazz>: <msg>"
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ trait SQLHelper {
if (spark.conf.isModifiable(k)) {
spark.conf.set(k, v)
} else {
throw new AnalysisException(s"Cannot modify the value of a static config: $k")
throw new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_3050",
messageParameters = Map("k" -> k))
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,8 @@ private[client] object GrpcExceptionConverter {
queryContext = params.queryContext)),
errorConstructor(params =>
new AnalysisException(
params.message,
cause = params.cause,
errorClass = params.errorClass,
errorClass = params.errorClass.orNull,
messageParameters = params.messageParameters,
context = params.queryContext)),
errorConstructor(params =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.trees.{Origin, WithOrigin}
* @since 1.3.0
*/
@Stable
class AnalysisException protected[sql] (
class AnalysisException protected(
val message: String,
val line: Option[Int] = None,
val startPosition: Option[Int] = None,
Expand All @@ -49,6 +49,18 @@ class AnalysisException protected[sql] (
messageParameters = messageParameters,
cause = cause)

def this(
errorClass: String,
messageParameters: Map[String, String],
context: Array[QueryContext],
cause: Option[Throwable]) =
this(
SparkThrowableHelper.getMessage(errorClass, messageParameters),
errorClass = Some(errorClass),
messageParameters = messageParameters,
context = context,
cause = cause)

def this(
errorClass: String,
messageParameters: Map[String, String],
Expand All @@ -69,6 +81,8 @@ class AnalysisException protected[sql] (
messageParameters = messageParameters,
cause = None)

def this(errorClass: String) = this(errorClass = errorClass, messageParameters = Map.empty)

def this(
errorClass: String,
messageParameters: Map[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ trait SQLConfHelper {
}
keys.lazyZip(values).foreach { (k, v) =>
if (SQLConf.isStaticConfigKey(k)) {
throw new AnalysisException(s"Cannot modify the value of a static config: $k")
throw new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_3050",
messageParameters = Map("k" -> k))
}
conf.setConfString(k, v)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,8 +513,12 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
// df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
// df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
// df1.select(df2.a) <- illegal reference df2.a
throw new AnalysisException(s"When resolving $u, " +
s"fail to find subplan with plan_id=$planId in\n$q")
throw new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_3051",
messageParameters = Map(
"u" -> u.toString,
"planId" -> planId.toString,
"q" -> q.toString))
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ object ResolveRowLevelCommandAssignments extends Rule[LogicalPlan] {
case i @ InsertAction(_, assignments) =>
i.copy(assignments = AssignmentUtils.alignInsertAssignments(attrs, assignments))
case other =>
throw new AnalysisException(s"Unexpected resolved action: $other")
throw new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_3052",
messageParameters = Map("other" -> other.toString))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper
case InsertAction(cond, assignments) =>
Keep(cond.getOrElse(TrueLiteral), assignments.map(_.value))
case other =>
throw new AnalysisException(s"Unexpected WHEN NOT MATCHED action: $other")
throw new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_3053",
messageParameters = Map("other" -> other.toString))
}

val outputs = notMatchedInstructions.flatMap(_.outputs)
Expand Down Expand Up @@ -440,7 +442,9 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper
Keep(cond.getOrElse(TrueLiteral), output)

case other =>
throw new AnalysisException(s"Unexpected action: $other")
throw new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_3052",
messageParameters = Map("other" -> other.toString))
}
}

Expand Down Expand Up @@ -472,7 +476,9 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper
Keep(cond.getOrElse(TrueLiteral), output)

case other =>
throw new AnalysisException(s"Unexpected action: $other")
throw new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_3052",
messageParameters = Map("other" -> other.toString))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
query: LogicalPlan,
funCatalogOpt: Option[FunctionCatalog] = None): Expression =
toCatalystOpt(expr, query, funCatalogOpt)
.getOrElse(throw new AnalysisException(s"$expr is not currently supported"))
.getOrElse(throw new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_3054", messageParameters = Map("expr" -> expr.toString)))

def toCatalystOpt(
expr: V2Expression,
Expand All @@ -88,7 +89,9 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
case ref: FieldReference =>
Some(resolveRef[NamedExpression](ref, query))
case _ =>
throw new AnalysisException(s"$expr is not currently supported")
throw new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_3054",
messageParameters = Map("expr" -> expr.toString))
}
}

Expand Down Expand Up @@ -176,8 +179,9 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
case Some(_) =>
ApplyFunctionExpression(scalarFunc, arguments)
case _ =>
throw new AnalysisException(s"ScalarFunction '${scalarFunc.name()}'" +
s" neither implement magic method nor override 'produceResult'")
throw new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_3055",
messageParameters = Map("scalarFunc" -> scalarFunc.name()))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,10 @@ object GroupBasedRowLevelOperation {

case other =>
throw new AnalysisException(
s"Unexpected row-level read relations (allow multiple = $allowMultipleReads): $other")
errorClass = "_LEGACY_ERROR_TEMP_3056",
messageParameters = Map(
"allowMultipleReads" -> allowMultipleReads.toString,
"other" -> other.toString))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ case class ReplaceData(
case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) =>
operation
case _ =>
throw new AnalysisException(s"Cannot retrieve row-level operation from $table")
throw new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_3057",
messageParameters = Map("table" -> table.toString))
}
}

Expand Down Expand Up @@ -313,7 +315,9 @@ case class WriteDelta(
case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) =>
operation.asInstanceOf[SupportsDelta]
case _ =>
throw new AnalysisException(s"Cannot retrieve row-level operation from $table")
throw new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_3057",
messageParameters = Map("table" -> table.toString))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ private[spark] object SchemaUtils {
case (x, ys) if ys.length > 1 => s"${x._2.mkString(".")}"
}
throw new AnalysisException(
s"Found duplicate column(s) $checkType: ${duplicateColumns.mkString(", ")}")
errorClass = "_LEGACY_ERROR_TEMP_3058",
messageParameters = Map(
"checkType" -> checkType,
"duplicateColumns" -> duplicateColumns.mkString(", ")))
}
}

Expand Down Expand Up @@ -225,9 +228,11 @@ private[spark] object SchemaUtils {
case o =>
if (column.length > 1) {
throw new AnalysisException(
s"""Expected $columnPath to be a nested data type, but found $o. Was looking for the
|index of ${UnresolvedAttribute(column).name} in a nested field
""".stripMargin)
errorClass = "_LEGACY_ERROR_TEMP_3062",
messageParameters = Map(
"columnPath" -> columnPath,
"o" -> o.toString,
"attr" -> UnresolvedAttribute(column).name))
}
Nil
}
Expand All @@ -239,9 +244,12 @@ private[spark] object SchemaUtils {
} catch {
case i: IndexOutOfBoundsException =>
throw new AnalysisException(
s"Couldn't find column ${i.getMessage} in:\n${schema.treeString}")
errorClass = "_LEGACY_ERROR_TEMP_3060",
messageParameters = Map("i" -> i.getMessage, "schema" -> schema.treeString))
case e: AnalysisException =>
throw new AnalysisException(e.getMessage + s":\n${schema.treeString}")
throw new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_3061",
messageParameters = Map("e" -> e.getMessage, "schema" -> schema.treeString))
}
}

Expand All @@ -261,7 +269,10 @@ private[spark] object SchemaUtils {
(nameAndField._1 :+ nowField.name) -> nowField
case _ =>
throw new AnalysisException(
s"The positions provided ($pos) cannot be resolved in\n${schema.treeString}.")
errorClass = "_LEGACY_ERROR_TEMP_3059",
messageParameters = Map(
"pos" -> pos.toString,
"schema" -> schema.treeString))
}
}
field._1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ class RelationalGroupedDataset protected[sql](
*/
def pivot(pivotColumn: Column): RelationalGroupedDataset = {
if (df.isStreaming) {
throw new AnalysisException("pivot is not supported on a streaming DataFrames/Datasets")
throw new AnalysisException(errorClass = "_LEGACY_ERROR_TEMP_3063")
}
// This is to prevent unintended OOM errors when the number of distinct values is large
val maxValues = df.sparkSession.sessionState.conf.dataFramePivotMaxValues
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,10 @@ abstract class JdbcDialect extends Serializable with Logging {
* @return `AnalysisException` or its sub-class.
*/
def classifyException(message: String, e: Throwable): AnalysisException = {
new AnalysisException(message, cause = Some(e))
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_3064",
messageParameters = Map("msg" -> message),
cause = Some(e))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
case o => o
}
throw new AnalysisException(
e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e))
errorClass = "_LEGACY_ERROR_TEMP_3065",
messageParameters = Map(
"clazz" -> e.getClass.getCanonicalName,
"msg" -> e.getMessage),
cause = Some(e))
}
}

Expand Down

0 comments on commit 826e051

Please sign in to comment.