Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35021][SQL] Group exception messages in connector/catalog #32377

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark.sql.connector.catalog

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.quoteIfNeeded
import org.apache.spark.sql.connector.expressions.{BucketTransform, IdentityTransform, LogicalExpressions, Transform}
import org.apache.spark.sql.errors.QueryCompilationErrors

/**
* Conversion helpers for working with v2 [[CatalogPlugin]].
Expand All @@ -39,8 +39,7 @@ private[sql] object CatalogV2Implicits {
implicit class BucketSpecHelper(spec: BucketSpec) {
def asTransform: BucketTransform = {
if (spec.sortColumnNames.nonEmpty) {
throw new AnalysisException(
s"Cannot convert bucketing with sort columns to a transform: $spec")
throw QueryCompilationErrors.cannotConvertBucketWithSortColumnsToTransformError(spec)
}

val references = spec.bucketColumnNames.map(col => reference(Seq(col)))
Expand All @@ -53,14 +52,13 @@ private[sql] object CatalogV2Implicits {
val (idTransforms, nonIdTransforms) = transforms.partition(_.isInstanceOf[IdentityTransform])

if (nonIdTransforms.nonEmpty) {
throw new AnalysisException("Transforms cannot be converted to partition columns: " +
nonIdTransforms.map(_.describe).mkString(", "))
throw QueryCompilationErrors.cannotConvertTransformsToPartitionColumnsError(nonIdTransforms)
}

idTransforms.map(_.asInstanceOf[IdentityTransform]).map(_.reference).map { ref =>
val parts = ref.fieldNames
if (parts.size > 1) {
throw new AnalysisException(s"Cannot partition by nested column: $ref")
throw QueryCompilationErrors.cannotPartitionByNestedColumnError(ref)
} else {
parts(0)
}
Expand All @@ -73,15 +71,14 @@ private[sql] object CatalogV2Implicits {
case tableCatalog: TableCatalog =>
tableCatalog
case _ =>
throw new AnalysisException(s"Cannot use catalog ${plugin.name}: not a TableCatalog")
throw QueryCompilationErrors.cannotUseCatalogError(plugin, "not a TableCatalog")
}

def asNamespaceCatalog: SupportsNamespaces = plugin match {
case namespaceCatalog: SupportsNamespaces =>
namespaceCatalog
case _ =>
throw new AnalysisException(
s"Cannot use catalog ${plugin.name}: does not support namespaces")
throw QueryCompilationErrors.cannotUseCatalogError(plugin, "does not support namespaces")
}

def isFunctionCatalog: Boolean = plugin match {
Expand All @@ -93,8 +90,7 @@ private[sql] object CatalogV2Implicits {
case functionCatalog: FunctionCatalog =>
functionCatalog
case _ =>
throw new AnalysisException(
s"Cannot use catalog '${plugin.name}': not a FunctionCatalog")
throw QueryCompilationErrors.cannotUseCatalogError(plugin, "not a FunctionCatalog")
}
}

Expand Down Expand Up @@ -128,22 +124,22 @@ private[sql] object CatalogV2Implicits {
case ns if ns.isEmpty => TableIdentifier(ident.name)
case Array(dbName) => TableIdentifier(ident.name, Some(dbName))
case _ =>
throw new AnalysisException(
s"$quoted is not a valid TableIdentifier as it has more than 2 name parts.")
throw QueryCompilationErrors.identifierHavingMoreThanTwoNamePartsError(
quoted, "TableIdentifier")
}

def asFunctionIdentifier: FunctionIdentifier = ident.namespace() match {
case ns if ns.isEmpty => FunctionIdentifier(ident.name())
case Array(dbName) => FunctionIdentifier(ident.name(), Some(dbName))
case _ =>
throw new AnalysisException(
s"$quoted is not a valid FunctionIdentifier as it has more than 2 name parts.")
throw QueryCompilationErrors.identifierHavingMoreThanTwoNamePartsError(
quoted, "FunctionIdentifier")
}
}

implicit class MultipartIdentifierHelper(parts: Seq[String]) {
if (parts.isEmpty) {
throw new AnalysisException("multi-part identifier cannot be empty.")
throw QueryCompilationErrors.emptyMultipartIdentifierError()
}

def asIdentifier: Identifier = Identifier.of(parts.init.toArray, parts.last)
Expand All @@ -152,16 +148,16 @@ private[sql] object CatalogV2Implicits {
case Seq(tblName) => TableIdentifier(tblName)
case Seq(dbName, tblName) => TableIdentifier(tblName, Some(dbName))
case _ =>
throw new AnalysisException(
s"$quoted is not a valid TableIdentifier as it has more than 2 name parts.")
throw QueryCompilationErrors.identifierHavingMoreThanTwoNamePartsError(
quoted, "TableIdentifier")
}

def asFunctionIdentifier: FunctionIdentifier = parts match {
case Seq(funcName) => FunctionIdentifier(funcName)
case Seq(dbName, funcName) => FunctionIdentifier(funcName, Some(dbName))
case _ =>
throw new AnalysisException(
s"$quoted is not a valid FunctionIdentifier as it has more than 2 name parts.")
throw QueryCompilationErrors.identifierHavingMoreThanTwoNamePartsError(
quoted, "FunctionIdentifier")
}

def quoted: String = parts.map(quoteIfNeeded).mkString(".")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import java.util.Collections

import scala.collection.JavaConverters._

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, CreateTableAsSelectStatement, CreateTableStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo}
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, NullType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -386,8 +386,7 @@ private[sql] object CatalogV2Util {
case _ => dt.isInstanceOf[NullType]
}
if (containsNullType(dt)) {
throw new AnalysisException(
s"Cannot create tables with ${NullType.simpleString} type.")
throw QueryCompilationErrors.cannotCreateTablesWithNullTypeError()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.spark.sql.connector.catalog

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}

/**
Expand Down Expand Up @@ -192,11 +192,11 @@ private[sql] trait LookupCatalog extends Logging {
ident.namespace match {
case Array(db) => FunctionIdentifier(ident.name, Some(db))
case _ =>
throw new AnalysisException(s"Unsupported function name '$ident'")
throw QueryCompilationErrors.unsupportedFunctionNameError(ident.toString)
}
}

case _ => throw new AnalysisException("function is only supported in v1 catalog")
case _ => throw QueryCompilationErrors.functionUnsupportedInV2CatalogError()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, ResolvedNamespace, ResolvedTable, ResolvedView, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, InvalidUDFClassException}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, InvalidUDFClassException}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CreateMap, Expression, GroupingID, NamedExpression, SpecifiedWindowFrame, WindowFrame, WindowFunction, WindowSpecDefinition}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan, SerdeInfo}
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.{toPrettySQL, FailFastMode, ParseMode, PermissiveMode}
import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, Table, TableCapability, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, NamespaceChange, Table, TableCapability, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.expressions.{NamedReference, Transform}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{AbstractDataType, DataType, StructField, StructType}
import org.apache.spark.sql.types.{AbstractDataType, DataType, NullType, StructField, StructType}

/**
* Object for grouping error messages from exceptions thrown during query compilation.
Expand Down Expand Up @@ -1355,4 +1356,39 @@ private[spark] object QueryCompilationErrors {
def cannotUseIntervalTypeInTableSchemaError(): Throwable = {
new AnalysisException("Cannot use interval type in the table schema.")
}

def cannotConvertBucketWithSortColumnsToTransformError(spec: BucketSpec): Throwable = {
new AnalysisException(
s"Cannot convert bucketing with sort columns to a transform: $spec")
}

def cannotConvertTransformsToPartitionColumnsError(nonIdTransforms: Seq[Transform]): Throwable = {
new AnalysisException("Transforms cannot be converted to partition columns: " +
nonIdTransforms.map(_.describe).mkString(", "))
}

def cannotPartitionByNestedColumnError(reference: NamedReference): Throwable = {
new AnalysisException(s"Cannot partition by nested column: $reference")
}

def cannotUseCatalogError(plugin: CatalogPlugin, msg: String): Throwable = {
new AnalysisException(s"Cannot use catalog ${plugin.name}: $msg")
}

def identifierHavingMoreThanTwoNamePartsError(
quoted: String, identifier: String): Throwable = {
new AnalysisException(s"$quoted is not a valid $identifier as it has more than 2 name parts.")
}

def emptyMultipartIdentifierError(): Throwable = {
new AnalysisException("multi-part identifier cannot be empty.")
}

def cannotCreateTablesWithNullTypeError(): Throwable = {
new AnalysisException(s"Cannot create tables with ${NullType.simpleString} type.")
}

def functionUnsupportedInV2CatalogError(): Throwable = {
new AnalysisException("function is only supported in v1 catalog")
}
}