Skip to content

Commit

Permalink
[SPARK-49877][SQL] Change classifyException function signature: add i…
Browse files Browse the repository at this point in the history
…sRuntime argument

### What changes were proposed in this pull request?
The proposal is to update the classifyException function so that it can return either ```AnalysisException``` or ```SparkRuntimeException```. This is achieved by adding a new parameter, ```isRuntime```, and modifying the return type to be ```Throwable with SparkThrowable``` for compatibility with both types.

### Why are the changes needed?
The changes are needed to allow the classifyException function to be used in execution part of the code.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Not needed.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #48351 from ivanjevtic-db/Change-classify-exception-function-signature.

Lead-authored-by: ivanjevtic-db <[email protected]>
Co-authored-by: Ivan Jevtic <[email protected]>
Co-authored-by: milastdbx <[email protected]>
Co-authored-by: Maxim Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
  • Loading branch information
3 people committed Oct 7, 2024
1 parent be546ff commit 5132ab1
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1262,13 +1262,14 @@ object JdbcUtils extends Logging with SQLConfHelper {
errorClass: String,
messageParameters: Map[String, String],
dialect: JdbcDialect,
description: String)(f: => T): T = {
description: String,
isRuntime: Boolean)(f: => T): T = {
try {
f
} catch {
case e: SparkThrowable with Throwable => throw e
case e: Throwable =>
throw dialect.classifyException(e, errorClass, messageParameters, description)
throw dialect.classifyException(e, errorClass, messageParameters, description, isRuntime)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
"indexName" -> toSQLId(indexName),
"tableName" -> toSQLId(name)),
dialect = JdbcDialects.get(jdbcOptions.url),
description = s"Failed to create index $indexName in ${name()}") {
description = s"Failed to create index $indexName in ${name()}",
isRuntime = false) {
JdbcUtils.createIndex(
conn, indexName, ident, columns, columnsProperties, properties, jdbcOptions)
}
Expand All @@ -92,7 +93,8 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
"indexName" -> toSQLId(indexName),
"tableName" -> toSQLId(name)),
dialect = JdbcDialects.get(jdbcOptions.url),
description = s"Failed to drop index $indexName in ${name()}") {
description = s"Failed to drop index $indexName in ${name()}",
isRuntime = false) {
JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class JDBCTableCatalog extends TableCatalog
"url" -> options.getRedactUrl(),
"namespace" -> toSQLId(namespace.toSeq)),
dialect,
description = s"Failed get tables from: ${namespace.mkString(".")}") {
description = s"Failed get tables from: ${namespace.mkString(".")}",
isRuntime = false) {
conn.getMetaData.getTables(null, schemaPattern, "%", Array("TABLE"))
}
new Iterator[Identifier] {
Expand All @@ -93,7 +94,8 @@ class JDBCTableCatalog extends TableCatalog
"url" -> options.getRedactUrl(),
"tableName" -> toSQLId(ident)),
dialect,
description = s"Failed table existence check: $ident") {
description = s"Failed table existence check: $ident",
isRuntime = false) {
JdbcUtils.withConnection(options)(JdbcUtils.tableExists(_, writeOptions))
}
}
Expand All @@ -120,7 +122,8 @@ class JDBCTableCatalog extends TableCatalog
"oldName" -> toSQLId(oldIdent),
"newName" -> toSQLId(newIdent)),
dialect,
description = s"Failed table renaming from $oldIdent to $newIdent") {
description = s"Failed table renaming from $oldIdent to $newIdent",
isRuntime = false) {
JdbcUtils.renameTable(conn, oldIdent, newIdent, options)
}
}
Expand All @@ -136,7 +139,8 @@ class JDBCTableCatalog extends TableCatalog
"url" -> options.getRedactUrl(),
"tableName" -> toSQLId(ident)),
dialect,
description = s"Failed to load table: $ident"
description = s"Failed to load table: $ident",
isRuntime = false
) {
val schema = JDBCRDD.resolveTable(optionsWithTableName)
JDBCTable(ident, schema, optionsWithTableName)
Expand Down Expand Up @@ -192,7 +196,8 @@ class JDBCTableCatalog extends TableCatalog
"url" -> options.getRedactUrl(),
"tableName" -> toSQLId(ident)),
dialect,
description = s"Failed table creation: $ident") {
description = s"Failed table creation: $ident",
isRuntime = false) {
JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions)
}
}
Expand All @@ -209,7 +214,8 @@ class JDBCTableCatalog extends TableCatalog
"url" -> options.getRedactUrl(),
"tableName" -> toSQLId(ident)),
dialect,
description = s"Failed table altering: $ident") {
description = s"Failed table altering: $ident",
isRuntime = false) {
JdbcUtils.alterTable(conn, getTableName(ident), changes, options)
}
loadTable(ident)
Expand All @@ -225,7 +231,8 @@ class JDBCTableCatalog extends TableCatalog
"url" -> options.getRedactUrl(),
"namespace" -> toSQLId(namespace.toSeq)),
dialect,
description = s"Failed namespace exists: ${namespace.mkString}") {
description = s"Failed namespace exists: ${namespace.mkString}",
isRuntime = false) {
JdbcUtils.schemaExists(conn, options, db)
}
}
Expand All @@ -238,7 +245,8 @@ class JDBCTableCatalog extends TableCatalog
errorClass = "FAILED_JDBC.LIST_NAMESPACES",
messageParameters = Map("url" -> options.getRedactUrl()),
dialect,
description = s"Failed list namespaces") {
description = s"Failed list namespaces",
isRuntime = false) {
JdbcUtils.listSchemas(conn, options)
}
}
Expand Down Expand Up @@ -292,7 +300,8 @@ class JDBCTableCatalog extends TableCatalog
"url" -> options.getRedactUrl(),
"namespace" -> toSQLId(db)),
dialect,
description = s"Failed create name space: $db") {
description = s"Failed create name space: $db",
isRuntime = false) {
JdbcUtils.createSchema(conn, options, db, comment)
}
}
Expand All @@ -317,7 +326,8 @@ class JDBCTableCatalog extends TableCatalog
"url" -> options.getRedactUrl(),
"namespace" -> toSQLId(db)),
dialect,
description = s"Failed create comment on name space: $db") {
description = s"Failed create comment on name space: $db",
isRuntime = false) {
JdbcUtils.alterSchemaComment(conn, options, db, set.value)
}
}
Expand All @@ -334,7 +344,8 @@ class JDBCTableCatalog extends TableCatalog
"url" -> options.getRedactUrl(),
"namespace" -> toSQLId(db)),
dialect,
description = s"Failed remove comment on name space: $db") {
description = s"Failed remove comment on name space: $db",
isRuntime = false) {
JdbcUtils.removeSchemaComment(conn, options, db)
}
}
Expand Down Expand Up @@ -362,7 +373,8 @@ class JDBCTableCatalog extends TableCatalog
"url" -> options.getRedactUrl(),
"namespace" -> toSQLId(db)),
dialect,
description = s"Failed drop name space: $db") {
description = s"Failed drop name space: $db",
isRuntime = false) {
JdbcUtils.dropSchema(conn, options, db, cascade)
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import java.util.Locale

import scala.util.control.NonFatal

import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
import org.apache.spark.sql.connector.catalog.Identifier
Expand Down Expand Up @@ -153,12 +152,12 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with NoLe
override def removeSchemaCommentQuery(schema: String): String = {
s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS ''"
}

override def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
description: String,
isRuntime: Boolean): Throwable with SparkThrowable = {
e match {
case sqlException: SQLException =>
sqlException.getSQLState match {
Expand All @@ -171,9 +170,10 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with NoLe
case "42710" if errorClass == "FAILED_JDBC.RENAME_TABLE" =>
val newTable = messageParameters("newName")
throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
case _ => super.classifyException(e, errorClass, messageParameters, description)
case _ =>
super.classifyException(e, errorClass, messageParameters, description, isRuntime)
}
case _ => super.classifyException(e, errorClass, messageParameters, description)
case _ => super.classifyException(e, errorClass, messageParameters, description, isRuntime)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import scala.util.control.NonFatal

import org.apache.commons.lang3.StringUtils

import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
Expand Down Expand Up @@ -200,7 +199,8 @@ private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError {
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
description: String,
isRuntime: Boolean): Throwable with SparkThrowable = {
e match {
case exception: SQLException =>
// Error codes are from https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html
Expand Down Expand Up @@ -244,7 +244,7 @@ private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError {
}
case _ => // do nothing
}
super.classifyException(e, errorClass, messageParameters, description)
super.classifyException(e, errorClass, messageParameters, description, isRuntime)
}

override def compileExpression(expr: Expression): Option[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.util.control.NonFatal

import org.apache.commons.lang3.StringUtils

import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.{SparkRuntimeException, SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
Expand Down Expand Up @@ -741,13 +741,15 @@ abstract class JdbcDialect extends Serializable with Logging {
* @param errorClass The error class assigned in the case of an unclassified `e`
* @param messageParameters The message parameters of `errorClass`
* @param description The error description
* @return `AnalysisException` or its sub-class.
* @param isRuntime Whether the exception is a runtime exception or not.
* @return `SparkThrowable + Throwable` or its sub-class.
*/
def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
description: String,
isRuntime: Boolean): Throwable with SparkThrowable = {
classifyException(description, e)
}

Expand Down Expand Up @@ -850,11 +852,19 @@ trait NoLegacyJDBCError extends JdbcDialect {
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
new AnalysisException(
errorClass = errorClass,
messageParameters = messageParameters,
cause = Some(e))
description: String,
isRuntime: Boolean): Throwable with SparkThrowable = {
if (isRuntime) {
new SparkRuntimeException(
errorClass = errorClass,
messageParameters = messageParameters,
cause = e)
} else {
new AnalysisException(
errorClass = errorClass,
messageParameters = messageParameters,
cause = Some(e))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Locale

import scala.util.control.NonFatal

import org.apache.spark.sql.AnalysisException
import org.apache.spark.SparkThrowable
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.expressions.{Expression, NullOrdering, SortDirection}
Expand Down Expand Up @@ -207,7 +207,8 @@ private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCErr
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
description: String,
isRuntime: Boolean): Throwable with SparkThrowable = {
e match {
case sqlException: SQLException =>
sqlException.getErrorCode match {
Expand All @@ -219,9 +220,10 @@ private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCErr
case 15335 if errorClass == "FAILED_JDBC.RENAME_TABLE" =>
val newTable = messageParameters("newName")
throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
case _ => super.classifyException(e, errorClass, messageParameters, description)
case _ =>
super.classifyException(e, errorClass, messageParameters, description, isRuntime)
}
case _ => super.classifyException(e, errorClass, messageParameters, description)
case _ => super.classifyException(e, errorClass, messageParameters, description, isRuntime)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import java.util.Locale
import scala.collection.mutable.ArrayBuilder
import scala.util.control.NonFatal

import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
import org.apache.spark.sql.connector.catalog.Identifier
Expand Down Expand Up @@ -353,7 +352,8 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with No
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
description: String,
isRuntime: Boolean): Throwable with SparkThrowable = {
e match {
case sqlException: SQLException =>
sqlException.getErrorCode match {
Expand All @@ -369,10 +369,11 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with No
val indexName = messageParameters("indexName")
val tableName = messageParameters("tableName")
throw new NoSuchIndexException(indexName, tableName, cause = Some(e))
case _ => super.classifyException(e, errorClass, messageParameters, description)
case _ =>
super.classifyException(e, errorClass, messageParameters, description, isRuntime)
}
case unsupported: UnsupportedOperationException => throw unsupported
case _ => super.classifyException(e, errorClass, messageParameters, description)
case _ => super.classifyException(e, errorClass, messageParameters, description, isRuntime)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import java.util.Locale

import scala.util.control.NonFatal

import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.errors.QueryCompilationErrors
Expand Down Expand Up @@ -236,16 +235,18 @@ private case class OracleDialect() extends JdbcDialect with SQLConfHelper with N
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
description: String,
isRuntime: Boolean): Throwable with SparkThrowable = {
e match {
case sqlException: SQLException =>
sqlException.getErrorCode match {
case 955 if errorClass == "FAILED_JDBC.RENAME_TABLE" =>
val newTable = messageParameters("newName")
throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
case _ => super.classifyException(e, errorClass, messageParameters, description)
case _ =>
super.classifyException(e, errorClass, messageParameters, description, isRuntime)
}
case _ => super.classifyException(e, errorClass, messageParameters, description)
case _ => super.classifyException(e, errorClass, messageParameters, description, isRuntime)
}
}
}
Expand Down
Loading

0 comments on commit 5132ab1

Please sign in to comment.