diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala index cc41d8ca9007f..39642fd541706 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala @@ -17,12 +17,12 @@ package org.apache.spark.sql.connector.catalog -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.quoteIfNeeded import org.apache.spark.sql.connector.expressions.{BucketTransform, IdentityTransform, LogicalExpressions, Transform} +import org.apache.spark.sql.errors.QueryCompilationErrors /** * Conversion helpers for working with v2 [[CatalogPlugin]]. @@ -39,8 +39,7 @@ private[sql] object CatalogV2Implicits { implicit class BucketSpecHelper(spec: BucketSpec) { def asTransform: BucketTransform = { if (spec.sortColumnNames.nonEmpty) { - throw new AnalysisException( - s"Cannot convert bucketing with sort columns to a transform: $spec") + throw QueryCompilationErrors.cannotConvertBucketWithSortColumnsToTransformError(spec) } val references = spec.bucketColumnNames.map(col => reference(Seq(col))) @@ -53,14 +52,13 @@ private[sql] object CatalogV2Implicits { val (idTransforms, nonIdTransforms) = transforms.partition(_.isInstanceOf[IdentityTransform]) if (nonIdTransforms.nonEmpty) { - throw new AnalysisException("Transforms cannot be converted to partition columns: " + - nonIdTransforms.map(_.describe).mkString(", ")) + throw QueryCompilationErrors.cannotConvertTransformsToPartitionColumnsError(nonIdTransforms) } idTransforms.map(_.asInstanceOf[IdentityTransform]).map(_.reference).map { ref => val parts = ref.fieldNames if (parts.size > 1) { - throw new AnalysisException(s"Cannot partition by nested column: $ref") + throw QueryCompilationErrors.cannotPartitionByNestedColumnError(ref) } else { parts(0) } @@ -73,15 +71,14 @@ private[sql] object CatalogV2Implicits { case tableCatalog: TableCatalog => tableCatalog case _ => - throw new AnalysisException(s"Cannot use catalog ${plugin.name}: not a TableCatalog") + throw QueryCompilationErrors.cannotUseCatalogError(plugin, "not a TableCatalog") } def asNamespaceCatalog: SupportsNamespaces = plugin match { case namespaceCatalog: SupportsNamespaces => namespaceCatalog case _ => - throw new AnalysisException( - s"Cannot use catalog ${plugin.name}: does not support namespaces") + throw QueryCompilationErrors.cannotUseCatalogError(plugin, "does not support namespaces") } def isFunctionCatalog: Boolean = plugin match { @@ -93,8 +90,7 @@ private[sql] object CatalogV2Implicits { case functionCatalog: FunctionCatalog => functionCatalog case _ => - throw new AnalysisException( - s"Cannot use catalog '${plugin.name}': not a FunctionCatalog") + throw QueryCompilationErrors.cannotUseCatalogError(plugin, "not a FunctionCatalog") } } @@ -128,22 +124,22 @@ private[sql] object CatalogV2Implicits { case ns if ns.isEmpty => TableIdentifier(ident.name) case Array(dbName) => TableIdentifier(ident.name, Some(dbName)) case _ => - throw new AnalysisException( - s"$quoted is not a valid TableIdentifier as it has more than 2 name parts.") + throw QueryCompilationErrors.identifierHavingMoreThanTwoNamePartsError( + quoted, "TableIdentifier") } def asFunctionIdentifier: FunctionIdentifier = ident.namespace() match { case ns if ns.isEmpty => FunctionIdentifier(ident.name()) case Array(dbName) => FunctionIdentifier(ident.name(), Some(dbName)) case _ => - throw new AnalysisException( - s"$quoted is not a valid FunctionIdentifier as it has more than 2 name parts.") + throw QueryCompilationErrors.identifierHavingMoreThanTwoNamePartsError( + quoted, "FunctionIdentifier") } } implicit class MultipartIdentifierHelper(parts: Seq[String]) { if (parts.isEmpty) { - throw new AnalysisException("multi-part identifier cannot be empty.") + throw QueryCompilationErrors.emptyMultipartIdentifierError() } def asIdentifier: Identifier = Identifier.of(parts.init.toArray, parts.last) @@ -152,16 +148,16 @@ private[sql] object CatalogV2Implicits { case Seq(tblName) => TableIdentifier(tblName) case Seq(dbName, tblName) => TableIdentifier(tblName, Some(dbName)) case _ => - throw new AnalysisException( - s"$quoted is not a valid TableIdentifier as it has more than 2 name parts.") + throw QueryCompilationErrors.identifierHavingMoreThanTwoNamePartsError( + quoted, "TableIdentifier") } def asFunctionIdentifier: FunctionIdentifier = parts match { case Seq(funcName) => FunctionIdentifier(funcName) case Seq(dbName, funcName) => FunctionIdentifier(funcName, Some(dbName)) case _ => - throw new AnalysisException( - s"$quoted is not a valid FunctionIdentifier as it has more than 2 name parts.") + throw QueryCompilationErrors.identifierHavingMoreThanTwoNamePartsError( + quoted, "FunctionIdentifier") } def quoted: String = parts.map(quoteIfNeeded).mkString(".") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 02db2293ec64a..a779e50a1f214 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -22,10 +22,10 @@ import java.util.Collections import scala.collection.JavaConverters._ -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation} import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, CreateTableAsSelectStatement, CreateTableStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo} import org.apache.spark.sql.connector.catalog.TableChange._ +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.{ArrayType, DataType, MapType, NullType, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -386,8 +386,7 @@ private[sql] object CatalogV2Util { case _ => dt.isInstanceOf[NullType] } if (containsNullType(dt)) { - throw new AnalysisException( - s"Cannot create tables with ${NullType.simpleString} type.") + throw QueryCompilationErrors.cannotCreateTablesWithNullTypeError() } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index dcd352267a178..06358590a1e46 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.connector.catalog import org.apache.spark.internal.Logging -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} /** @@ -192,11 +192,11 @@ private[sql] trait LookupCatalog extends Logging { ident.namespace match { case Array(db) => FunctionIdentifier(ident.name, Some(db)) case _ => - throw new AnalysisException(s"Unsupported function name '$ident'") + throw QueryCompilationErrors.unsupportedFunctionNameError(ident.toString) } } - case _ => throw new AnalysisException("function is only supported in v1 catalog") + case _ => throw QueryCompilationErrors.functionUnsupportedInV2CatalogError() } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 9cc649ee32f16..7626c339cb420 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -22,17 +22,18 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, ResolvedNamespace, ResolvedTable, ResolvedView, TableAlreadyExistsException} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, InvalidUDFClassException} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, InvalidUDFClassException} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CreateMap, Expression, GroupingID, NamedExpression, SpecifiedWindowFrame, WindowFrame, WindowFunction, WindowSpecDefinition} import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan, SerdeInfo} import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.{toPrettySQL, FailFastMode, ParseMode, PermissiveMode} -import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, Table, TableCapability, TableChange, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, NamespaceChange, Table, TableCapability, TableChange, V1Table} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ +import org.apache.spark.sql.connector.expressions.{NamedReference, Transform} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.streaming.OutputMode -import org.apache.spark.sql.types.{AbstractDataType, DataType, StructField, StructType} +import org.apache.spark.sql.types.{AbstractDataType, DataType, NullType, StructField, StructType} /** * Object for grouping error messages from exceptions thrown during query compilation. @@ -1355,4 +1356,39 @@ private[spark] object QueryCompilationErrors { def cannotUseIntervalTypeInTableSchemaError(): Throwable = { new AnalysisException("Cannot use interval type in the table schema.") } + + def cannotConvertBucketWithSortColumnsToTransformError(spec: BucketSpec): Throwable = { + new AnalysisException( + s"Cannot convert bucketing with sort columns to a transform: $spec") + } + + def cannotConvertTransformsToPartitionColumnsError(nonIdTransforms: Seq[Transform]): Throwable = { + new AnalysisException("Transforms cannot be converted to partition columns: " + + nonIdTransforms.map(_.describe).mkString(", ")) + } + + def cannotPartitionByNestedColumnError(reference: NamedReference): Throwable = { + new AnalysisException(s"Cannot partition by nested column: $reference") + } + + def cannotUseCatalogError(plugin: CatalogPlugin, msg: String): Throwable = { + new AnalysisException(s"Cannot use catalog ${plugin.name}: $msg") + } + + def identifierHavingMoreThanTwoNamePartsError( + quoted: String, identifier: String): Throwable = { + new AnalysisException(s"$quoted is not a valid $identifier as it has more than 2 name parts.") + } + + def emptyMultipartIdentifierError(): Throwable = { + new AnalysisException("multi-part identifier cannot be empty.") + } + + def cannotCreateTablesWithNullTypeError(): Throwable = { + new AnalysisException(s"Cannot create tables with ${NullType.simpleString} type.") + } + + def functionUnsupportedInV2CatalogError(): Throwable = { + new AnalysisException("function is only supported in v1 catalog") + } }