Skip to content

Commit

Permalink
[SPARK-44321][CONNECT] Decouple ParseException from AnalysisException
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR decouples ParseException from AnalysisException.

### Why are the changes needed?
We are moving (parts of) parsing to sql/api to share (datatype) parsing between connect and sql.

### Does this PR introduce _any_ user-facing change?
Yes and no. This is a breaking change in the sense that the parent class of the ParseException changes from AnalysisException to SparkException.

### How was this patch tested?
Existing tests.

Closes #41879 from hvanhovell/SPARK-44321.

Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
  • Loading branch information
hvanhovell committed Jul 8, 2023
1 parent 57bbb4c commit fdeb8d8
Show file tree
Hide file tree
Showing 25 changed files with 110 additions and 77 deletions.
1 change: 1 addition & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ license: |
- Since Spark 3.5, Spark thrift server will interrupt task when canceling a running statement. To restore the previous behavior, set `spark.sql.thriftServer.interruptOnCancel` to `false`.
- Since Spark 3.5, the Avro will throw `AnalysisException` when reading Interval types as Date or Timestamp types, or reading Decimal types with lower precision. To restore the legacy behavior, set `spark.sql.legacy.avro.allowIncompatibleSchema` to `true`
- Since Spark 3.5, Row's json and prettyJson methods are moved to `ToJsonUtil`.
- Since Spark 3.5, ParseException is a subclass of SparkException instead of AnalysisException.

## Upgrading from Spark SQL 3.3 to 3.4

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ case class Origin(
}
}

/**
* Helper trait for objects that can be traced back to an [[Origin]].
*/
trait WithOrigin {
def origin: Origin
}

/**
* Provides a location for TreeNodes to ask about the context of their origin. For example, which
* line of code is currently being parsed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.{QueryContext, SparkThrowable, SparkThrowableHelper}
import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.catalyst.trees.{Origin, WithOrigin}

/**
* Thrown when a query fails to analyze, usually because the query itself is invalid.
Expand All @@ -40,7 +40,7 @@ class AnalysisException protected[sql] (
val errorClass: Option[String] = None,
val messageParameters: Map[String, String] = Map.empty,
val context: Array[QueryContext] = Array.empty)
extends Exception(message, cause.orNull) with SparkThrowable with Serializable {
extends Exception(message, cause.orNull) with SparkThrowable with Serializable with WithOrigin {

def this(
errorClass: String,
Expand Down Expand Up @@ -139,4 +139,6 @@ class AnalysisException protected[sql] (
override def getErrorClass: String = errorClass.orNull

override def getQueryContext: Array[QueryContext] = context

override lazy val origin: Origin = Origin(line, startPosition)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
*/
package org.apache.spark.sql.catalyst.parser

import scala.collection.JavaConverters._

import org.antlr.v4.runtime._
import org.antlr.v4.runtime.atn.PredictionMode
import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
import org.antlr.v4.runtime.tree.TerminalNodeImpl

import org.apache.spark.{QueryContext, SparkThrowableHelper}
import org.apache.spark.{QueryContext, SparkException, SparkThrowable, SparkThrowableHelper}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SqlApiConf}
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin}
import org.apache.spark.sql.SqlApiConf
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin, WithOrigin}
import org.apache.spark.sql.errors.QueryParsingErrors
import org.apache.spark.sql.types.{DataType, StructType}

Expand Down Expand Up @@ -92,10 +94,15 @@ abstract class AbstractParser extends DataTypeParserInterface with Logging {
throw e
case e: ParseException =>
throw e.withCommand(command)
case e: AnalysisException =>
val position = Origin(e.line, e.startPosition)
throw new ParseException(Option(command), e.message, position, position,
e.errorClass, e.messageParameters)
case e: SparkThrowable with WithOrigin =>
throw new ParseException(
command = Option(command),
message = e.getMessage,
start = e.origin,
stop = e.origin,
errorClass = Option(e.getErrorClass),
messageParameters = e.getMessageParameters.asScala.toMap,
queryContext = e.getQueryContext)
}
}

Expand Down Expand Up @@ -141,7 +148,7 @@ private[parser] class UpperCaseCharStream(wrapped: CodePointCharStream) extends
}

/**
* The ParseErrorListener converts parse errors into AnalysisExceptions.
* The ParseErrorListener converts parse errors into ParseExceptions.
*/
case object ParseErrorListener extends BaseErrorListener {
override def syntaxError(
Expand Down Expand Up @@ -171,25 +178,23 @@ case object ParseErrorListener extends BaseErrorListener {
}

/**
* A [[ParseException]] is an [[AnalysisException]] that is thrown during the parse process. It
* A [[ParseException]] is an [[SparkException]] that is thrown during the parse process. It
* contains fields and an extended error message that make reporting and diagnosing errors easier.
*/
class ParseException(
val command: Option[String],
message: String,
val message: String,
val start: Origin,
val stop: Origin,
errorClass: Option[String] = None,
messageParameters: Map[String, String] = Map.empty,
queryContext: Array[QueryContext] = ParseException.getQueryContext())
extends AnalysisException(
val errorClass: Option[String] = None,
val messageParameters: Map[String, String] = Map.empty,
val queryContext: Array[QueryContext] = ParseException.getQueryContext())
extends SparkException(
message,
start.line,
start.startPosition,
None,
None,
cause = null,
errorClass,
messageParameters) {
messageParameters,
queryContext) {

def this(errorClass: String, messageParameters: Map[String, String], ctx: ParserRuleContext) =
this(Option(ParserUtils.command(ctx)),
Expand All @@ -216,6 +221,14 @@ class ParseException(
Some(errorClass),
messageParameters)

// Methods added to retain compatibility with AnalysisException.
@deprecated("Use start.line instead.")
def line: Option[Int] = start.line
@deprecated("Use start.startPosition instead.")
def startPosition: Option[Int] = start.startPosition
@deprecated("ParseException is never caused by another exception.")
def cause: Option[Throwable] = None

override def getMessage: String = {
val builder = new StringBuilder
builder ++= "\n" ++= message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,14 @@ object AlwaysProcess {
}

// scalastyle:off
abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with TreePatternBits {
abstract class TreeNode[BaseType <: TreeNode[BaseType]]
extends Product
with TreePatternBits
with WithOrigin {
// scalastyle:on
self: BaseType =>

val origin: Origin = CurrentOrigin.get
override val origin: Origin = CurrentOrigin.get

/**
* A mutable map for holding auxiliary information of this tree node. It will be carried over
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ org.apache.spark.sql.AnalysisException
-- !query
select from_csv('1', 'a InvalidType')
-- !query analysis
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "PARSE_SYNTAX_ERROR",
"sqlState" : "42601",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ org.apache.spark.sql.AnalysisException
-- !query
select from_json('{"a":1}', 'a InvalidType')
-- !query analysis
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "PARSE_SYNTAX_ERROR",
"sqlState" : "42601",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ select from_csv('1', 'a InvalidType')
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "PARSE_SYNTAX_ERROR",
"sqlState" : "42601",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ select from_json('{"a":1}', 'a InvalidType')
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "PARSE_SYNTAX_ERROR",
"sqlState" : "42601",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.{SparkException, SparkRuntimeException}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Literal, StructsToJson}
import org.apache.spark.sql.catalyst.expressions.Cast._
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -540,7 +541,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
)

checkError(
exception = intercept[AnalysisException] {
exception = intercept[ParseException] {
df3.selectExpr("""from_json(value, 'time InvalidType')""")
},
errorClass = "PARSE_SYNTAX_ERROR",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.scalactic.source.Position
import org.scalatest.Tag

import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, ExpressionSet}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
import org.apache.spark.sql.catalyst.trees.TreePattern.OUTER_REFERENCE
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -939,7 +940,7 @@ class LateralColumnAliasSuite extends LateralColumnAliasSuiteBase {
lca = "`jy`", windowExprRegex = "\"sum.*\"")
// this is initially not supported
checkError(
exception = intercept[AnalysisException] {
exception = intercept[ParseException] {
sql("select name, dept, 1 as n, rank() over " +
"(partition by dept order by salary rows between n preceding and current row) as rank " +
s"from $testTable where dept in (1, 6)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql

import java.time.{Instant, LocalDate, LocalDateTime, ZoneId}

import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -239,7 +240,7 @@ class ParametersSuite extends QueryTest with SharedSparkSession {
val sqlText = "CREATE VIEW v AS SELECT :p AS p"
val args = Map("p" -> 1)
checkError(
exception = intercept[AnalysisException] {
exception = intercept[ParseException] {
spark.sql(sqlText, args)
},
errorClass = "UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT",
Expand All @@ -254,7 +255,7 @@ class ParametersSuite extends QueryTest with SharedSparkSession {
val sqlText = "CREATE VIEW v AS SELECT ? AS p"
val args = Array(1)
checkError(
exception = intercept[AnalysisException] {
exception = intercept[ParseException] {
spark.sql(sqlText, args)
},
errorClass = "UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT",
Expand All @@ -269,7 +270,7 @@ class ParametersSuite extends QueryTest with SharedSparkSession {
val sqlText = "CREATE VIEW v AS WITH cte(a) AS (SELECT (SELECT :p) AS a) SELECT a FROM cte"
val args = Map("p" -> 1)
checkError(
exception = intercept[AnalysisException] {
exception = intercept[ParseException] {
spark.sql(sqlText, args)
},
errorClass = "UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT",
Expand All @@ -284,7 +285,7 @@ class ParametersSuite extends QueryTest with SharedSparkSession {
val sqlText = "CREATE VIEW v AS WITH cte(a) AS (SELECT (SELECT ?) AS a) SELECT a FROM cte"
val args = Array(1)
checkError(
exception = intercept[AnalysisException] {
exception = intercept[ParseException] {
spark.sql(sqlText, args)
},
errorClass = "UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT",
Expand All @@ -303,7 +304,7 @@ class ParametersSuite extends QueryTest with SharedSparkSession {
|SELECT a FROM cte)""".stripMargin
val args = Map("p" -> 1)
checkError(
exception = intercept[AnalysisException] {
exception = intercept[ParseException] {
spark.sql(sqlText, args)
},
errorClass = "UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT",
Expand All @@ -322,7 +323,7 @@ class ParametersSuite extends QueryTest with SharedSparkSession {
|SELECT a FROM cte)""".stripMargin
val args = Array(1)
checkError(
exception = intercept[AnalysisException] {
exception = intercept[ParseException] {
spark.sql(sqlText, args)
},
errorClass = "UNSUPPORTED_FEATURE.PARAMETER_MARKER_IN_UNEXPECTED_STATEMENT",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql

import org.apache.spark.{SparkConf, SparkNumberFormatException, SparkThrowable}
import org.apache.spark.sql.catalyst.expressions.Hex
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connector.catalog.InMemoryPartitionTableCatalog
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
Expand Down Expand Up @@ -391,7 +392,7 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c string) USING PARQUET PARTITIONED BY (c)")
checkError(
exception = intercept[AnalysisException] {
exception = intercept[ParseException] {
sql("INSERT OVERWRITE t PARTITION (c='2', C='3') VALUES (1)")
},
sqlState = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.{GenericRow, Hex}
import org.apache.spark.sql.catalyst.expressions.Cast._
import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, Partial}
import org.apache.spark.sql.catalyst.optimizer.{ConvertToLocalRelation, NestedColumnAliasingSuite}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.{LocalLimit, Project, RepartitionByExpression, Sort}
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.execution.{CommandResultExec, UnionExec}
Expand Down Expand Up @@ -593,7 +594,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}

test("Allow only a single WITH clause per query") {
intercept[AnalysisException] {
intercept[ParseException] {
sql(
"with q1 as (select * from testData) with q2 as (select * from q1) select * from q2")
}
Expand Down Expand Up @@ -1535,7 +1536,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
.save(path)

// We don't support creating a temporary table while specifying a database
intercept[AnalysisException] {
intercept[ParseException] {
spark.sql(
s"""
|CREATE TEMPORARY VIEW db.t
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
import org.apache.spark.sql.connector.catalog.Table
Expand Down Expand Up @@ -129,10 +130,10 @@ trait AlterTableTests extends SharedSparkSession with QueryErrorsBase {
withTable(t) {
sql(s"CREATE TABLE $t (id int, point struct<x: double, y: double>) USING $v2Format")
val e1 =
intercept[AnalysisException](sql(s"ALTER TABLE $t ADD COLUMN data interval"))
intercept[ParseException](sql(s"ALTER TABLE $t ADD COLUMN data interval"))
assert(e1.getMessage.contains("Cannot use interval type in the table schema."))
val e2 =
intercept[AnalysisException](sql(s"ALTER TABLE $t ADD COLUMN point.z interval"))
intercept[ParseException](sql(s"ALTER TABLE $t ADD COLUMN point.z interval"))
assert(e2.getMessage.contains("Cannot use interval type in the table schema."))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {

test("error handling: fail if the temp view name contains the database prefix") {
// Fully qualified table name like "database.table" is not allowed for temporary view
val e = intercept[AnalysisException] {
val e = intercept[ParseException] {
sql("CREATE OR REPLACE TEMPORARY VIEW default.myabcdview AS SELECT * FROM jt")
}
assert(e.message.contains(
Expand All @@ -329,7 +329,7 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {

test("error handling: disallow IF NOT EXISTS for CREATE TEMPORARY VIEW") {
withTempView("myabcdview") {
val e = intercept[AnalysisException] {
val e = intercept[ParseException] {
sql("CREATE TEMPORARY VIEW IF NOT EXISTS myabcdview AS SELECT * FROM jt")
}
assert(e.message.contains("It is not allowed to define a TEMPORARY view with IF NOT EXISTS"))
Expand Down Expand Up @@ -483,7 +483,7 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {

sql("DROP VIEW testView")

val e = intercept[AnalysisException] {
val e = intercept[ParseException] {
sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
}
assert(e.message.contains(
Expand Down
Loading

0 comments on commit fdeb8d8

Please sign in to comment.