From 931ab065df3952487028316ebd49c2895d947bf2 Mon Sep 17 00:00:00 2001 From: "zhipeng.mao" Date: Sun, 15 Sep 2024 13:35:00 +0800 Subject: [PATCH] [SPARK-48824][SQL] Add Identity Column SQL syntax ### What changes were proposed in this pull request? Add SQL support for creating identity columns. Users can specify a column `GENERATED ALWAYS AS IDENTITY(identityColumnSpec)` , where identity values are **always** generated by the system, or `GENERATED BY DEFAULT AS IDENTITY(identityColumnSpec)`, where users can specify the identity values. Users can optionally specify the starting value of the column (default = 1) and the increment/step of the column (default = 1). Also we allow both `START WITH INCREMENT BY ` and `INCREMENT BY START WITH ` It allows flexible ordering of the increment and starting values, as both variants are used in the wild by other systems (e.g. [PostgreSQL](https://www.postgresql.org/docs/current/sql-createsequence.html) [Oracle](https://docs.oracle.com/en/database/oracle/oracle-database/23/sqlrf/CREATE-SEQUENCE.html#GUID-E9C78A8C-615A-4757-B2A8-5E6EFB130571)). For example, we can define ``` CREATE TABLE default.example ( id LONG GENERATED ALWAYS AS IDENTITY, id1 LONG GENERATED ALWAYS AS IDENTITY(), id2 LONG GENERATED BY DEFAULT AS IDENTITY(START WITH 0), id3 LONG GENERATED ALWAYS AS IDENTITY(INCREMENT BY 2), id4 LONG GENERATED BY DEFAULT AS IDENTITY(START WITH 0 INCREMENT BY -10), id5 LONG GENERATED ALWAYS AS IDENTITY(INCREMENT BY 2 START WITH -8), value LONG ) ``` This will enable defining identity columns in Spark SQL for data sources that support it. To be more specific this PR - Adds parser support for GENERATED { BY DEFAULT | ALWAYS } AS IDENTITY in create/replace table statements. Identity column specifications are temporarily stored in the field's metadata, and then are parsed/verified in DataSourceV2Strategy and used to instantiate v2 [Column] - Adds TableCatalog::capabilities() and TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_IDENTITY_COLUMNS This will be used to determine whether to allow specifying identity columns or whether to throw an exception. ### Why are the changes needed? A SQL API is needed to create Identity Columns. ### Does this PR introduce _any_ user-facing change? It allows the aforementioned SQL syntax to create identity columns in a table. ### How was this patch tested? Positive and negative unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47614 from zhipengmao-db/zhipengmao-db/SPARK-48824-id-syntax. Authored-by: zhipeng.mao Signed-off-by: Wenchen Fan --- .../resources/error/error-conditions.json | 24 ++ docs/sql-ref-ansi-compliance.md | 2 + .../spark/sql/catalyst/parser/SqlBaseLexer.g4 | 2 + .../sql/catalyst/parser/SqlBaseParser.g4 | 21 +- .../connector/catalog/IdentityColumnSpec.java | 88 ++++++++ .../spark/sql/errors/QueryParsingErrors.scala | 19 ++ .../spark/sql/connector/catalog/Column.java | 24 +- .../catalog/TableCatalogCapability.java | 20 +- .../sql/catalyst/parser/AstBuilder.scala | 66 +++++- .../plans/logical/ColumnDefinition.scala | 68 ++++-- .../sql/catalyst/util/IdentityColumn.scala | 78 +++++++ .../sql/connector/catalog/CatalogV2Util.scala | 47 +++- .../sql/internal/connector/ColumnImpl.scala | 3 +- .../sql/catalyst/parser/DDLParserSuite.scala | 213 +++++++++++++++++- .../catalog/InMemoryTableCatalog.scala | 3 +- .../datasources/DataSourceStrategy.scala | 7 +- .../datasources/v2/DataSourceV2Strategy.scala | 5 +- .../sql-tests/results/ansi/keywords.sql.out | 2 + .../sql-tests/results/keywords.sql.out | 2 + .../sql/connector/DataSourceV2SQLSuite.scala | 58 +++++ .../sql/execution/command/DDLSuite.scala | 11 + .../ThriftServerWithSparkContextSuite.scala | 2 +- 22 files changed, 724 insertions(+), 41 deletions(-) create mode 100644 sql/api/src/main/java/org/apache/spark/sql/connector/catalog/IdentityColumnSpec.java create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IdentityColumn.scala diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index a6d8550716b96..38472f44fb599 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -1589,6 +1589,30 @@ ], "sqlState" : "42601" }, + "IDENTITY_COLUMNS_DUPLICATED_SEQUENCE_GENERATOR_OPTION" : { + "message" : [ + "Duplicated IDENTITY column sequence generator option: ." + ], + "sqlState" : "42601" + }, + "IDENTITY_COLUMNS_ILLEGAL_STEP" : { + "message" : [ + "IDENTITY column step cannot be 0." + ], + "sqlState" : "42611" + }, + "IDENTITY_COLUMNS_UNSUPPORTED_DATA_TYPE" : { + "message" : [ + "DataType is not supported for IDENTITY columns." + ], + "sqlState" : "428H2" + }, + "IDENTITY_COLUMN_WITH_DEFAULT_VALUE" : { + "message" : [ + "A column cannot have both a default value and an identity column specification but column has default value: () and identity column specification: ()." + ], + "sqlState" : "42623" + }, "ILLEGAL_DAY_OF_WEEK" : { "message" : [ "Illegal input for day of week: ." diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index fe5ddf27bf6c4..7987e5eb6012a 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -536,12 +536,14 @@ Below is a list of all the keywords in Spark SQL. |HOUR|non-reserved|non-reserved|non-reserved| |HOURS|non-reserved|non-reserved|non-reserved| |IDENTIFIER|non-reserved|non-reserved|non-reserved| +|IDENTITY|non-reserved|non-reserved|non-reserved| |IF|non-reserved|non-reserved|not a keyword| |IGNORE|non-reserved|non-reserved|non-reserved| |IMMEDIATE|non-reserved|non-reserved|non-reserved| |IMPORT|non-reserved|non-reserved|non-reserved| |IN|reserved|non-reserved|reserved| |INCLUDE|non-reserved|non-reserved|non-reserved| +|INCREMENT|non-reserved|non-reserved|non-reserved| |INDEX|non-reserved|non-reserved|non-reserved| |INDEXES|non-reserved|non-reserved|non-reserved| |INNER|reserved|strict-non-reserved|reserved| diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index 96a58b99debeb..c82ee57a25179 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -256,12 +256,14 @@ BINARY_HEX: 'X'; HOUR: 'HOUR'; HOURS: 'HOURS'; IDENTIFIER_KW: 'IDENTIFIER'; +IDENTITY: 'IDENTITY'; IF: 'IF'; IGNORE: 'IGNORE'; IMMEDIATE: 'IMMEDIATE'; IMPORT: 'IMPORT'; IN: 'IN'; INCLUDE: 'INCLUDE'; +INCREMENT: 'INCREMENT'; INDEX: 'INDEX'; INDEXES: 'INDEXES'; INNER: 'INNER'; diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 3ea408ca42703..1840b68878419 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -1297,7 +1297,22 @@ colDefinitionOption ; generationExpression - : GENERATED ALWAYS AS LEFT_PAREN expression RIGHT_PAREN + : GENERATED ALWAYS AS LEFT_PAREN expression RIGHT_PAREN #generatedColumn + | GENERATED (ALWAYS | BY DEFAULT) AS IDENTITY identityColSpec? #identityColumn + ; + +identityColSpec + : LEFT_PAREN sequenceGeneratorOption* RIGHT_PAREN + ; + +sequenceGeneratorOption + : START WITH start=sequenceGeneratorStartOrStep + | INCREMENT BY step=sequenceGeneratorStartOrStep + ; + +sequenceGeneratorStartOrStep + : MINUS? INTEGER_VALUE + | MINUS? BIGINT_LITERAL ; complexColTypeList @@ -1591,11 +1606,13 @@ ansiNonReserved | HOUR | HOURS | IDENTIFIER_KW + | IDENTITY | IF | IGNORE | IMMEDIATE | IMPORT | INCLUDE + | INCREMENT | INDEX | INDEXES | INPATH @@ -1942,12 +1959,14 @@ nonReserved | HOUR | HOURS | IDENTIFIER_KW + | IDENTITY | IF | IGNORE | IMMEDIATE | IMPORT | IN | INCLUDE + | INCREMENT | INDEX | INDEXES | INPATH diff --git a/sql/api/src/main/java/org/apache/spark/sql/connector/catalog/IdentityColumnSpec.java b/sql/api/src/main/java/org/apache/spark/sql/connector/catalog/IdentityColumnSpec.java new file mode 100644 index 0000000000000..4a8943736bd31 --- /dev/null +++ b/sql/api/src/main/java/org/apache/spark/sql/connector/catalog/IdentityColumnSpec.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; +import org.apache.spark.annotation.Evolving; + +import java.util.Objects; + +/** + * Identity column specification. + */ +@Evolving +public class IdentityColumnSpec { + private final long start; + private final long step; + private final boolean allowExplicitInsert; + + /** + * Creates an identity column specification. + * @param start the start value to generate the identity values + * @param step the step value to generate the identity values + * @param allowExplicitInsert whether the identity column allows explicit insertion of values + */ + public IdentityColumnSpec(long start, long step, boolean allowExplicitInsert) { + this.start = start; + this.step = step; + this.allowExplicitInsert = allowExplicitInsert; + } + + /** + * @return the start value to generate the identity values + */ + public long getStart() { + return start; + } + + /** + * @return the step value to generate the identity values + */ + public long getStep() { + return step; + } + + /** + * @return whether the identity column allows explicit insertion of values + */ + public boolean isAllowExplicitInsert() { + return allowExplicitInsert; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + IdentityColumnSpec that = (IdentityColumnSpec) o; + return start == that.start && + step == that.step && + allowExplicitInsert == that.allowExplicitInsert; + } + + @Override + public int hashCode() { + return Objects.hash(start, step, allowExplicitInsert); + } + + @Override + public String toString() { + return "IdentityColumnSpec{" + + "start=" + start + + ", step=" + step + + ", allowExplicitInsert=" + allowExplicitInsert + + "}"; + } +} diff --git a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala index 5f7fcb92f7bd1..b19607a28f06c 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala @@ -556,6 +556,25 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase { ctx) } + def identityColumnUnsupportedDataType( + ctx: IdentityColumnContext, + dataType: String): Throwable = { + new ParseException("IDENTITY_COLUMNS_UNSUPPORTED_DATA_TYPE", Map("dataType" -> dataType), ctx) + } + + def identityColumnIllegalStep(ctx: IdentityColSpecContext): Throwable = { + new ParseException("IDENTITY_COLUMNS_ILLEGAL_STEP", Map.empty, ctx) + } + + def identityColumnDuplicatedSequenceGeneratorOption( + ctx: IdentityColSpecContext, + sequenceGeneratorOption: String): Throwable = { + new ParseException( + "IDENTITY_COLUMNS_DUPLICATED_SEQUENCE_GENERATOR_OPTION", + Map("sequenceGeneratorOption" -> sequenceGeneratorOption), + ctx) + } + def createViewWithBothIfNotExistsAndReplaceError(ctx: CreateViewContext): Throwable = { new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0052", ctx) } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java index b191438dbc3ee..8b32940d7a657 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java @@ -53,7 +53,7 @@ static Column create( boolean nullable, String comment, String metadataInJSON) { - return new ColumnImpl(name, dataType, nullable, comment, null, null, metadataInJSON); + return new ColumnImpl(name, dataType, nullable, comment, null, null, null, metadataInJSON); } static Column create( @@ -63,7 +63,8 @@ static Column create( String comment, ColumnDefaultValue defaultValue, String metadataInJSON) { - return new ColumnImpl(name, dataType, nullable, comment, defaultValue, null, metadataInJSON); + return new ColumnImpl(name, dataType, nullable, comment, defaultValue, + null, null, metadataInJSON); } static Column create( @@ -74,7 +75,18 @@ static Column create( String generationExpression, String metadataInJSON) { return new ColumnImpl(name, dataType, nullable, comment, null, - generationExpression, metadataInJSON); + generationExpression, null, metadataInJSON); + } + + static Column create( + String name, + DataType dataType, + boolean nullable, + String comment, + IdentityColumnSpec identityColumnSpec, + String metadataInJSON) { + return new ColumnImpl(name, dataType, nullable, comment, null, + null, identityColumnSpec, metadataInJSON); } /** @@ -113,6 +125,12 @@ static Column create( @Nullable String generationExpression(); + /** + * Returns the identity column specification of this table column. Null means no identity column. + */ + @Nullable + IdentityColumnSpec identityColumnSpec(); + /** * Returns the column metadata in JSON format. */ diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java index 5ccb15ff1f0a4..dceac1b484cf2 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java @@ -59,5 +59,23 @@ public enum TableCatalogCapability { * {@link TableCatalog#createTable}. * See {@link Column#defaultValue()}. */ - SUPPORT_COLUMN_DEFAULT_VALUE + SUPPORT_COLUMN_DEFAULT_VALUE, + + /** + * Signals that the TableCatalog supports defining identity columns upon table creation in SQL. + *

+ * Without this capability, any create/replace table statements with an identity column defined + * in the table schema will throw an exception during analysis. + *

+ * An identity column is defined with syntax: + * {@code colName colType GENERATED ALWAYS AS IDENTITY(identityColumnSpec)} + * or + * {@code colName colType GENERATED BY DEFAULT AS IDENTITY(identityColumnSpec)} + * identityColumnSpec is defined with syntax: {@code [START WITH start | INCREMENT BY step]*} + *

+ * IdentitySpec is included in the column definition for APIs like + * {@link TableCatalog#createTable}. + * See {@link Column#identityColumnSpec()}. + */ + SUPPORTS_CREATE_TABLE_WITH_IDENTITY_COLUMNS } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index edcb417da123b..cb0e0e35c3704 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, IntervalUtils} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, stringToTimestamp, stringToTimestampWithoutTimeZone} -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsNamespaces, TableCatalog, TableWritePrivilege} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, IdentityColumnSpec, SupportsNamespaces, TableCatalog, TableWritePrivilege} import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform} import org.apache.spark.sql.errors.{DataTypeErrorsBase, QueryCompilationErrors, QueryParsingErrors, SqlScriptingErrors} @@ -3619,13 +3619,19 @@ class AstBuilder extends DataTypeAstBuilder } } + val dataType = typedVisit[DataType](ctx.dataType) ColumnDefinition( name = name, - dataType = typedVisit[DataType](ctx.dataType), + dataType = dataType, nullable = nullable, comment = commentSpec.map(visitCommentSpec), defaultValue = defaultExpression.map(visitDefaultExpression), - generationExpression = generationExpression.map(visitGenerationExpression) + generationExpression = generationExpression.collect { + case ctx: GeneratedColumnContext => visitGeneratedColumn(ctx) + }, + identityColumnSpec = generationExpression.collect { + case ctx: IdentityColumnContext => visitIdentityColumn(ctx, dataType) + } ) } @@ -3681,11 +3687,63 @@ class AstBuilder extends DataTypeAstBuilder /** * Create a generation expression string. */ - override def visitGenerationExpression(ctx: GenerationExpressionContext): String = + override def visitGeneratedColumn(ctx: GeneratedColumnContext): String = withOrigin(ctx) { getDefaultExpression(ctx.expression(), "GENERATED").originalSQL } + /** + * Parse and verify IDENTITY column definition. + * + * @param ctx The parser context. + * @param dataType The data type of column defined as IDENTITY column. Used for verification. + * @return Tuple containing start, step and allowExplicitInsert. + */ + protected def visitIdentityColumn( + ctx: IdentityColumnContext, + dataType: DataType): IdentityColumnSpec = { + if (dataType != LongType && dataType != IntegerType) { + throw QueryParsingErrors.identityColumnUnsupportedDataType(ctx, dataType.toString) + } + // We support two flavors of syntax: + // (1) GENERATED ALWAYS AS IDENTITY (...) + // (2) GENERATED BY DEFAULT AS IDENTITY (...) + // (1) forbids explicit inserts, while (2) allows. + val allowExplicitInsert = ctx.BY() != null && ctx.DEFAULT() != null + val (start, step) = visitIdentityColSpec(ctx.identityColSpec()) + + new IdentityColumnSpec(start, step, allowExplicitInsert) + } + + override def visitIdentityColSpec(ctx: IdentityColSpecContext): (Long, Long) = { + val defaultStart = 1 + val defaultStep = 1 + if (ctx == null) { + return (defaultStart, defaultStep) + } + var (start, step): (Option[Long], Option[Long]) = (None, None) + ctx.sequenceGeneratorOption().asScala.foreach { option => + if (option.start != null) { + if (start.isDefined) { + throw QueryParsingErrors.identityColumnDuplicatedSequenceGeneratorOption(ctx, "START") + } + start = Some(option.start.getText.toLong) + } else if (option.step != null) { + if (step.isDefined) { + throw QueryParsingErrors.identityColumnDuplicatedSequenceGeneratorOption(ctx, "STEP") + } + step = Some(option.step.getText.toLong) + if (step.get == 0L) { + throw QueryParsingErrors.identityColumnIllegalStep(ctx) + } + } else { + throw SparkException + .internalError(s"Invalid identity column sequence generator option: ${option.getText}") + } + } + (start.getOrElse(defaultStart), step.getOrElse(defaultStep)) + } + /** * Create an optional comment string. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala index 83e50aa33c70d..043214711ccf9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ColumnDefinition.scala @@ -21,10 +21,10 @@ import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, UnaryExpression, Unevaluable} import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.util.GeneratedColumn +import org.apache.spark.sql.catalyst.util.{GeneratedColumn, IdentityColumn} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.validateDefaultValueExpr import org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils.{CURRENT_DEFAULT_COLUMN_METADATA_KEY, EXISTS_DEFAULT_COLUMN_METADATA_KEY} -import org.apache.spark.sql.connector.catalog.{Column => V2Column, ColumnDefaultValue} +import org.apache.spark.sql.connector.catalog.{Column => V2Column, ColumnDefaultValue, IdentityColumnSpec} import org.apache.spark.sql.connector.expressions.LiteralValue import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.connector.ColumnImpl @@ -41,7 +41,11 @@ case class ColumnDefinition( comment: Option[String] = None, defaultValue: Option[DefaultValueExpression] = None, generationExpression: Option[String] = None, + identityColumnSpec: Option[IdentityColumnSpec] = None, metadata: Metadata = Metadata.empty) extends Expression with Unevaluable { + assert( + generationExpression.isEmpty || identityColumnSpec.isEmpty, + "A ColumnDefinition cannot contain both a generation expression and an identity column spec.") override def children: Seq[Expression] = defaultValue.toSeq @@ -58,6 +62,7 @@ case class ColumnDefinition( comment.orNull, defaultValue.map(_.toV2(statement, name)).orNull, generationExpression.orNull, + identityColumnSpec.orNull, if (metadata == Metadata.empty) null else metadata.json) } @@ -75,8 +80,19 @@ case class ColumnDefinition( generationExpression.foreach { generationExpr => metadataBuilder.putString(GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY, generationExpr) } + encodeIdentityColumnSpec(metadataBuilder) StructField(name, dataType, nullable, metadataBuilder.build()) } + + private def encodeIdentityColumnSpec(metadataBuilder: MetadataBuilder): Unit = { + identityColumnSpec.foreach { spec: IdentityColumnSpec => + metadataBuilder.putLong(IdentityColumn.IDENTITY_INFO_START, spec.getStart) + metadataBuilder.putLong(IdentityColumn.IDENTITY_INFO_STEP, spec.getStep) + metadataBuilder.putBoolean( + IdentityColumn.IDENTITY_INFO_ALLOW_EXPLICIT_INSERT, + spec.isAllowExplicitInsert) + } + } } object ColumnDefinition { @@ -87,6 +103,9 @@ object ColumnDefinition { metadataBuilder.remove(CURRENT_DEFAULT_COLUMN_METADATA_KEY) metadataBuilder.remove(EXISTS_DEFAULT_COLUMN_METADATA_KEY) metadataBuilder.remove(GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY) + metadataBuilder.remove(IdentityColumn.IDENTITY_INFO_START) + metadataBuilder.remove(IdentityColumn.IDENTITY_INFO_STEP) + metadataBuilder.remove(IdentityColumn.IDENTITY_INFO_ALLOW_EXPLICIT_INSERT) val hasDefaultValue = col.getCurrentDefaultValue().isDefined && col.getExistenceDefaultValue().isDefined @@ -97,6 +116,15 @@ object ColumnDefinition { None } val generationExpr = GeneratedColumn.getGenerationExpression(col) + val identityColumnSpec = if (col.metadata.contains(IdentityColumn.IDENTITY_INFO_START)) { + Some(new IdentityColumnSpec( + col.metadata.getLong(IdentityColumn.IDENTITY_INFO_START), + col.metadata.getLong(IdentityColumn.IDENTITY_INFO_STEP), + col.metadata.getBoolean(IdentityColumn.IDENTITY_INFO_ALLOW_EXPLICIT_INSERT) + )) + } else { + None + } ColumnDefinition( col.name, col.dataType, @@ -104,6 +132,7 @@ object ColumnDefinition { col.getComment(), defaultValue, generationExpr, + identityColumnSpec, metadataBuilder.build() ) } @@ -124,18 +153,8 @@ object ColumnDefinition { s"Command $cmd should not have column default value expression.") } cmd.columns.foreach { col => - if (col.defaultValue.isDefined && col.generationExpression.isDefined) { - throw new AnalysisException( - errorClass = "GENERATED_COLUMN_WITH_DEFAULT_VALUE", - messageParameters = Map( - "colName" -> col.name, - "defaultValue" -> col.defaultValue.get.originalSQL, - "genExpr" -> col.generationExpression.get - ) - ) - } - col.defaultValue.foreach { default => + checkDefaultColumnConflicts(col) validateDefaultValueExpr(default, statement, col.name, col.dataType) } } @@ -143,6 +162,29 @@ object ColumnDefinition { case _ => } } + + private def checkDefaultColumnConflicts(col: ColumnDefinition): Unit = { + if (col.generationExpression.isDefined) { + throw new AnalysisException( + errorClass = "GENERATED_COLUMN_WITH_DEFAULT_VALUE", + messageParameters = Map( + "colName" -> col.name, + "defaultValue" -> col.defaultValue.get.originalSQL, + "genExpr" -> col.generationExpression.get + ) + ) + } + if (col.identityColumnSpec.isDefined) { + throw new AnalysisException( + errorClass = "IDENTITY_COLUMN_WITH_DEFAULT_VALUE", + messageParameters = Map( + "colName" -> col.name, + "defaultValue" -> col.defaultValue.get.originalSQL, + "identityColumnSpec" -> col.identityColumnSpec.get.toString + ) + ) + } + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IdentityColumn.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IdentityColumn.scala new file mode 100644 index 0000000000000..26a3cb026d317 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IdentityColumn.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.connector.catalog.{Identifier, IdentityColumnSpec, TableCatalog, TableCatalogCapability} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * This object contains utility methods and values for Identity Columns + */ +object IdentityColumn { + val IDENTITY_INFO_START = "identity.start" + val IDENTITY_INFO_STEP = "identity.step" + val IDENTITY_INFO_ALLOW_EXPLICIT_INSERT = "identity.allowExplicitInsert" + + /** + * If `schema` contains any generated columns, check whether the table catalog supports identity + * columns. Otherwise throw an error. + */ + def validateIdentityColumn( + schema: StructType, + catalog: TableCatalog, + ident: Identifier): Unit = { + if (hasIdentityColumns(schema)) { + if (!catalog + .capabilities() + .contains(TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_IDENTITY_COLUMNS)) { + throw QueryCompilationErrors.unsupportedTableOperationError( + catalog, ident, operation = "identity column" + ) + } + } + } + + /** + * Whether the given `field` is an identity column + */ + def isIdentityColumn(field: StructField): Boolean = { + field.metadata.contains(IDENTITY_INFO_START) + } + + /** + * Returns the identity information stored in the column metadata if it exists + */ + def getIdentityInfo(field: StructField): Option[IdentityColumnSpec] = { + if (isIdentityColumn(field)) { + Some(new IdentityColumnSpec( + field.metadata.getString(IDENTITY_INFO_START).toLong, + field.metadata.getString(IDENTITY_INFO_STEP).toLong, + field.metadata.getString(IDENTITY_INFO_ALLOW_EXPLICIT_INSERT).toBoolean)) + } else { + None + } + } + + /** + * Whether the `schema` has one or more identity columns + */ + def hasIdentityColumns(schema: StructType): Boolean = { + schema.exists(isIdentityColumn) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 6698f0a021400..9b7f68070a1a4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, Named import org.apache.spark.sql.catalyst.catalog.ClusterBySpec import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec} -import org.apache.spark.sql.catalyst.util.GeneratedColumn +import org.apache.spark.sql.catalyst.util.{GeneratedColumn, IdentityColumn} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction @@ -579,18 +579,10 @@ private[sql] object CatalogV2Util { val isDefaultColumn = f.getCurrentDefaultValue().isDefined && f.getExistenceDefaultValue().isDefined val isGeneratedColumn = GeneratedColumn.isGeneratedColumn(f) - if (isDefaultColumn && isGeneratedColumn) { - throw new AnalysisException( - errorClass = "GENERATED_COLUMN_WITH_DEFAULT_VALUE", - messageParameters = Map( - "colName" -> f.name, - "defaultValue" -> f.getCurrentDefaultValue().get, - "genExpr" -> GeneratedColumn.getGenerationExpression(f).get - ) - ) - } - + val isIdentityColumn = IdentityColumn.isIdentityColumn(f) if (isDefaultColumn) { + checkDefaultColumnConflicts(f) + val e = analyze( f, statementType = "Column analysis", @@ -611,10 +603,41 @@ private[sql] object CatalogV2Util { Seq("comment", GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY)) Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull, GeneratedColumn.getGenerationExpression(f).get, metadataAsJson(cleanedMetadata)) + } else if (isIdentityColumn) { + val cleanedMetadata = metadataWithKeysRemoved( + Seq("comment", + IdentityColumn.IDENTITY_INFO_START, + IdentityColumn.IDENTITY_INFO_STEP, + IdentityColumn.IDENTITY_INFO_ALLOW_EXPLICIT_INSERT)) + Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull, + IdentityColumn.getIdentityInfo(f).get, metadataAsJson(cleanedMetadata)) } else { val cleanedMetadata = metadataWithKeysRemoved(Seq("comment")) Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull, metadataAsJson(cleanedMetadata)) } } + + private def checkDefaultColumnConflicts(f: StructField): Unit = { + if (GeneratedColumn.isGeneratedColumn(f)) { + throw new AnalysisException( + errorClass = "GENERATED_COLUMN_WITH_DEFAULT_VALUE", + messageParameters = Map( + "colName" -> f.name, + "defaultValue" -> f.getCurrentDefaultValue().get, + "genExpr" -> GeneratedColumn.getGenerationExpression(f).get + ) + ) + } + if (IdentityColumn.isIdentityColumn(f)) { + throw new AnalysisException( + errorClass = "IDENTITY_COLUMN_WITH_DEFAULT_VALUE", + messageParameters = Map( + "colName" -> f.name, + "defaultValue" -> f.getCurrentDefaultValue().get, + "identityColumnSpec" -> IdentityColumn.getIdentityInfo(f).get.toString + ) + ) + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala index 2a67ffc4bbef5..47889410561e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.internal.connector -import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue} +import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, IdentityColumnSpec} import org.apache.spark.sql.types.DataType // The standard concrete implementation of data source V2 column. @@ -28,4 +28,5 @@ case class ColumnImpl( comment: String, defaultValue: ColumnDefaultValue, generationExpression: String, + identityColumnSpec: IdentityColumnSpec, metadataInJSON: String) extends Column diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 0f2bb791f3465..b7e2490b552cc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -20,14 +20,16 @@ package org.apache.spark.sql.catalyst.parser import java.util.Locale import org.apache.spark.SparkThrowable +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.connector.catalog.IdentityColumnSpec import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first} import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, ClusterByTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform} import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{Decimal, IntegerType, LongType, StringType, StructType, TimestampType} +import org.apache.spark.sql.types.{DataType, Decimal, IntegerType, LongType, StringType, StructType, TimestampType} import org.apache.spark.storage.StorageLevelMapper import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -2856,10 +2858,217 @@ class DDLParserSuite extends AnalysisTest { exception = parseException( "CREATE TABLE my_tab(a INT, b INT GENERATED ALWAYS AS a + 1) USING PARQUET"), condition = "PARSE_SYNTAX_ERROR", - parameters = Map("error" -> "'a'", "hint" -> ": missing '('") + parameters = Map("error" -> "'a'", "hint" -> "") ) } + test("SPARK-48824: implement parser support for " + + "GENERATED ALWAYS/BY DEFAULT AS IDENTITY columns in tables ") { + def parseAndCompareIdentityColumnPlan( + identityColumnDataTypeStr: String, + identityColumnDefStr: String, + identityColumnSpecStr: String, + expectedDataType: DataType, + expectedStart: Long, + expectedStep: Long, + expectedAllowExplicitInsert: Boolean): Unit = { + val columnsWithIdentitySpec = Seq( + ColumnDefinition( + name = "id", + dataType = expectedDataType, + nullable = true, + identityColumnSpec = Some( + new IdentityColumnSpec( + expectedStart, + expectedStep, + expectedAllowExplicitInsert + ) + ) + ), + ColumnDefinition("val", IntegerType) + ) + comparePlans( + parsePlan( + s"CREATE TABLE my_tab(id $identityColumnDataTypeStr GENERATED $identityColumnDefStr" + + s" AS IDENTITY $identityColumnSpecStr, val INT) USING parquet" + ), + CreateTable( + UnresolvedIdentifier(Seq("my_tab")), + columnsWithIdentitySpec, + Seq.empty[Transform], + UnresolvedTableSpec( + Map.empty[String, String], + Some("parquet"), + OptionList(Seq.empty), + None, + None, + None, + false + ), + false + ) + ) + + comparePlans( + parsePlan( + s"REPLACE TABLE my_tab(id $identityColumnDataTypeStr GENERATED $identityColumnDefStr" + + s" AS IDENTITY $identityColumnSpecStr, val INT) USING parquet" + ), + ReplaceTable( + UnresolvedIdentifier(Seq("my_tab")), + columnsWithIdentitySpec, + Seq.empty[Transform], + UnresolvedTableSpec( + Map.empty[String, String], + Some("parquet"), + OptionList(Seq.empty), + None, + None, + None, + false + ), + false + ) + ) + } + for { + identityColumnDefStr <- Seq("BY DEFAULT", "ALWAYS") + identityColumnDataTypeStr <- Seq("BIGINT", "INT") + } { + val expectedAllowExplicitInsert = identityColumnDefStr == "BY DEFAULT" + val expectedDataType = identityColumnDataTypeStr match { + case "BIGINT" => LongType + case "INT" => IntegerType + } + parseAndCompareIdentityColumnPlan( + identityColumnDataTypeStr, + identityColumnDefStr, + "(START WITH 2 INCREMENT BY 2)", + expectedDataType, + expectedStart = 2, + expectedStep = 2, + expectedAllowExplicitInsert = expectedAllowExplicitInsert) + parseAndCompareIdentityColumnPlan( + identityColumnDataTypeStr, + identityColumnDefStr, + "(START WITH -2 INCREMENT BY -2)", + expectedDataType, + expectedStart = -2, + expectedStep = -2, + expectedAllowExplicitInsert = expectedAllowExplicitInsert) + parseAndCompareIdentityColumnPlan( + identityColumnDataTypeStr, + identityColumnDefStr, + "(START WITH 2)", + expectedDataType, + expectedStart = 2, + expectedStep = 1, + expectedAllowExplicitInsert = expectedAllowExplicitInsert) + parseAndCompareIdentityColumnPlan( + identityColumnDataTypeStr, + identityColumnDefStr, + "(START WITH -2)", + expectedDataType, + expectedStart = -2, + expectedStep = 1, + expectedAllowExplicitInsert = expectedAllowExplicitInsert) + parseAndCompareIdentityColumnPlan( + identityColumnDataTypeStr, + identityColumnDefStr, + "(INCREMENT BY 2)", + expectedDataType, + expectedStart = 1, + expectedStep = 2, + expectedAllowExplicitInsert = expectedAllowExplicitInsert) + parseAndCompareIdentityColumnPlan( + identityColumnDataTypeStr, + identityColumnDefStr, + "(INCREMENT BY -2)", + expectedDataType, + expectedStart = 1, + expectedStep = -2, + expectedAllowExplicitInsert = expectedAllowExplicitInsert) + parseAndCompareIdentityColumnPlan( + identityColumnDataTypeStr, + identityColumnDefStr, + "()", + expectedDataType, + expectedStart = 1, + expectedStep = 1, + expectedAllowExplicitInsert = expectedAllowExplicitInsert) + parseAndCompareIdentityColumnPlan( + identityColumnDataTypeStr, + identityColumnDefStr, + "", + expectedDataType, + expectedStart = 1, + expectedStep = 1, + expectedAllowExplicitInsert = expectedAllowExplicitInsert) + } + } + + test("SPARK-48824: Column cannot have both a generation expression and an identity column spec") { + checkError( + exception = intercept[AnalysisException] { + parsePlan(s"CREATE TABLE testcat.my_tab(id BIGINT GENERATED ALWAYS AS 1" + + s" GENERATED ALWAYS AS IDENTITY, val INT) USING foo") + }, + condition = "PARSE_SYNTAX_ERROR", + parameters = Map("error" -> "'1'", "hint" -> "") + ) + } + + test("SPARK-48824: Identity column step must not be zero") { + checkError( + exception = intercept[ParseException] { + parsePlan( + s"CREATE TABLE testcat.my_tab" + + s"(id BIGINT GENERATED ALWAYS AS IDENTITY(INCREMENT BY 0), val INT) USING foo" + ) + }, + condition = "IDENTITY_COLUMNS_ILLEGAL_STEP", + parameters = Map.empty, + context = ExpectedContext( + fragment = "id BIGINT GENERATED ALWAYS AS IDENTITY(INCREMENT BY 0)", + start = 28, + stop = 81) + ) + } + + test("SPARK-48824: Identity column datatype must be long or integer") { + checkError( + exception = intercept[ParseException] { + parsePlan( + s"CREATE TABLE testcat.my_tab(id FLOAT GENERATED ALWAYS AS IDENTITY(), val INT) USING foo" + ) + }, + condition = "IDENTITY_COLUMNS_UNSUPPORTED_DATA_TYPE", + parameters = Map("dataType" -> "FloatType"), + context = + ExpectedContext(fragment = "id FLOAT GENERATED ALWAYS AS IDENTITY()", start = 28, stop = 66) + ) + } + + test("SPARK-48824: Identity column sequence generator option cannot be duplicated") { + val identityColumnSpecStrs = Seq( + "(START WITH 0 START WITH 1)", + "(INCREMENT BY 1 INCREMENT BY 2)", + "(START WITH 0 INCREMENT BY 1 START WITH 1)", + "(INCREMENT BY 1 START WITH 0 INCREMENT BY 2)" + ) + for { + identitySpecStr <- identityColumnSpecStrs + } { + val exception = intercept[ParseException] { + parsePlan( + s"CREATE TABLE testcat.my_tab" + + s"(id BIGINT GENERATED ALWAYS AS IDENTITY $identitySpecStr, val INT) USING foo" + ) + } + assert(exception.getErrorClass === "IDENTITY_COLUMNS_DUPLICATED_SEQUENCE_GENERATOR_OPTION") + } + } + test("SPARK-42681: Relax ordering constraint for ALTER TABLE ADD COLUMN options") { // Positive test cases to verify that column definition options could be applied in any order. val expectedPlan = AddColumns( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index 982de88e58847..56ed3bb243e19 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -167,7 +167,8 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp override def capabilities: java.util.Set[TableCatalogCapability] = { Set( TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE, - TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS + TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS, + TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_IDENTITY_COLUMNS ).asJava } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 1dd2659a1b169..2be4b236872f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoDir, I import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.catalyst.types.DataTypeUtils -import org.apache.spark.sql.catalyst.util.{GeneratedColumn, ResolveDefaultColumns, V2ExpressionBuilder} +import org.apache.spark.sql.catalyst.util.{GeneratedColumn, IdentityColumn, ResolveDefaultColumns, V2ExpressionBuilder} import org.apache.spark.sql.connector.catalog.{SupportsRead, V1Table} import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, NullOrdering, SortDirection, SortOrder => V2SortOrder, SortValue} @@ -146,6 +146,11 @@ object DataSourceAnalysis extends Rule[LogicalPlan] { tableDesc.identifier, "generated columns") } + if (IdentityColumn.hasIdentityColumns(newSchema)) { + throw QueryCompilationErrors.unsupportedTableOperationError( + tableDesc.identifier, "identity columns") + } + val newTableDesc = tableDesc.copy(schema = newSchema) CreateDataSourceTableCommand(newTableDesc, ignoreIfExists = mode == SaveMode.Ignore) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 112ee2c5450b2..d7f46c32f99a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -32,7 +32,8 @@ import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn, ResolveDefaultColumns, V2ExpressionBuilder} +import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn, + IdentityColumn, ResolveDefaultColumns, V2ExpressionBuilder} import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog, TruncatableTable} import org.apache.spark.sql.connector.catalog.index.SupportsIndex import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue} @@ -185,6 +186,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat val statementType = "CREATE TABLE" GeneratedColumn.validateGeneratedColumns( c.tableSchema, catalog.asTableCatalog, ident, statementType) + IdentityColumn.validateIdentityColumn(c.tableSchema, catalog.asTableCatalog, ident) CreateTableExec( catalog.asTableCatalog, @@ -214,6 +216,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat val statementType = "REPLACE TABLE" GeneratedColumn.validateGeneratedColumns( c.tableSchema, catalog.asTableCatalog, ident, statementType) + IdentityColumn.validateIdentityColumn(c.tableSchema, catalog.asTableCatalog, ident) val v2Columns = columns.map(_.toV2Column(statementType)).toArray catalog match { diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out index 81ccc0f9efc13..b464427d379a3 100644 --- a/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out @@ -142,6 +142,7 @@ HAVING true HOUR false HOURS false IDENTIFIER false +IDENTITY false IF false IGNORE false ILIKE false @@ -149,6 +150,7 @@ IMMEDIATE false IMPORT false IN true INCLUDE false +INCREMENT false INDEX false INDEXES false INNER true diff --git a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out index e145c57332eb2..16436d7a722ce 100644 --- a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out @@ -142,6 +142,7 @@ HAVING false HOUR false HOURS false IDENTIFIER false +IDENTITY false IF false IGNORE false ILIKE false @@ -149,6 +150,7 @@ IMMEDIATE false IMPORT false IN false INCLUDE false +INCREMENT false INDEX false INDEXES false INNER false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 998d459cd436c..5df7b62cfb285 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1753,6 +1753,64 @@ class DataSourceV2SQLSuiteV1Filter } } + test("SPARK-48824: Column cannot have both an identity column spec and a default value") { + val tblName = "my_tab" + val tableDefinition = + s"$tblName(id BIGINT GENERATED ALWAYS AS IDENTITY DEFAULT 0, name STRING)" + withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> "foo") { + for (statement <- Seq("CREATE TABLE", "REPLACE TABLE")) { + withTable(s"testcat.$tblName") { + if (statement == "REPLACE TABLE") { + sql(s"CREATE TABLE testcat.$tblName(a INT) USING foo") + } + checkError( + exception = intercept[AnalysisException] { + sql(s"$statement testcat.$tableDefinition USING foo") + }, + condition = "IDENTITY_COLUMN_WITH_DEFAULT_VALUE", + parameters = Map( + "colName" -> "id", + "defaultValue" -> "0", + "identityColumnSpec" -> + "IdentityColumnSpec{start=1, step=1, allowExplicitInsert=false}") + ) + } + } + } + } + + test("SPARK-48824: Identity columns only allowed with TableCatalogs that " + + "SUPPORTS_CREATE_TABLE_WITH_IDENTITY_COLUMNS") { + val tblName = "my_tab" + val tableDefinition = + s"$tblName(id BIGINT GENERATED ALWAYS AS IDENTITY(), val INT)" + for (statement <- Seq("CREATE TABLE", "REPLACE TABLE")) { + // InMemoryTableCatalog.capabilities() = {SUPPORTS_CREATE_TABLE_WITH_IDENTITY_COLUMNS} + withTable(s"testcat.$tblName") { + if (statement == "REPLACE TABLE") { + sql(s"CREATE TABLE testcat.$tblName(a INT) USING foo") + } + // Can create table with an identity column + sql(s"$statement testcat.$tableDefinition USING foo") + assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), tblName))) + } + // BasicInMemoryTableCatalog.capabilities() = {} + withSQLConf("spark.sql.catalog.dummy" -> classOf[BasicInMemoryTableCatalog].getName) { + checkError( + exception = intercept[AnalysisException] { + sql("USE dummy") + sql(s"$statement dummy.$tableDefinition USING foo") + }, + condition = "UNSUPPORTED_FEATURE.TABLE_OPERATION", + parameters = Map( + "tableName" -> "`dummy`.`my_tab`", + "operation" -> "identity column" + ) + ) + } + } + } + test("SPARK-46972: asymmetrical replacement for char/varchar in V2SessionCatalog.createTable") { // unset this config to use the default v2 session catalog. spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 6e58b0e62ed63..8307326f17fcf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2288,6 +2288,17 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { ) } + test("SPARK-48824: No identity columns with V1") { + checkError( + exception = intercept[AnalysisException] { + sql(s"create table t(a int, b bigint generated always as identity()) using parquet") + }, + condition = "UNSUPPORTED_FEATURE.TABLE_OPERATION", + parameters = Map("tableName" -> "`spark_catalog`.`default`.`t`", + "operation" -> "identity columns") + ) + } + test("SPARK-44837: Error when altering partition column in non-delta table") { withTable("t") { sql("CREATE TABLE t(i INT, j INT, k INT) USING parquet PARTITIONED BY (i, j)") diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index edef6371be8ae..5b8ee4ea9714f 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -214,7 +214,7 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { val sessionHandle = client.openSession(user, "") val infoValue = client.getInfo(sessionHandle, GetInfoType.CLI_ODBC_KEYWORDS) // scalastyle:off line.size.limit - assert(infoValue.getStringValue == "ADD,AFTER,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,AUTHORIZATION,BEGIN,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLATIONS,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONSTRAINT,CONTAINS,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,DATA,DATABASE,DATABASES,DATE,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAY,DAYOFYEAR,DAYS,DBPROPERTIES,DEC,DECIMAL,DECLARE,DEFAULT,DEFINED,DEFINER,DELETE,DELIMITED,DESC,DESCRIBE,DETERMINISTIC,DFS,DIRECTORIES,DIRECTORY,DISTINCT,DISTRIBUTE,DIV,DO,DOUBLE,DROP,ELSE,END,ESCAPE,ESCAPED,EVOLUTION,EXCEPT,EXCHANGE,EXCLUDE,EXECUTE,EXISTS,EXPLAIN,EXPORT,EXTENDED,EXTERNAL,EXTRACT,FALSE,FETCH,FIELDS,FILEFORMAT,FILTER,FIRST,FLOAT,FOLLOWING,FOR,FOREIGN,FORMAT,FORMATTED,FROM,FULL,FUNCTION,FUNCTIONS,GENERATED,GLOBAL,GRANT,GROUP,GROUPING,HAVING,HOUR,HOURS,IDENTIFIER,IF,IGNORE,ILIKE,IMMEDIATE,IMPORT,IN,INCLUDE,INDEX,INDEXES,INNER,INPATH,INPUT,INPUTFORMAT,INSERT,INT,INTEGER,INTERSECT,INTERVAL,INTO,INVOKER,IS,ITEMS,ITERATE,JOIN,KEYS,LANGUAGE,LAST,LATERAL,LAZY,LEADING,LEAVE,LEFT,LIKE,LIMIT,LINES,LIST,LOAD,LOCAL,LOCATION,LOCK,LOCKS,LOGICAL,LONG,MACRO,MAP,MATCHED,MERGE,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTE,MINUTES,MODIFIES,MONTH,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NATURAL,NO,NONE,NOT,NULL,NULLS,NUMERIC,OF,OFFSET,ON,ONLY,OPTION,OPTIONS,OR,ORDER,OUT,OUTER,OUTPUTFORMAT,OVER,OVERLAPS,OVERLAY,OVERWRITE,PARTITION,PARTITIONED,PARTITIONS,PERCENT,PIVOT,PLACING,POSITION,PRECEDING,PRIMARY,PRINCIPALS,PROPERTIES,PURGE,QUARTER,QUERY,RANGE,READS,REAL,RECORDREADER,RECORDWRITER,RECOVER,REDUCE,REFERENCES,REFRESH,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,RETURN,RETURNS,REVOKE,RIGHT,ROLE,ROLES,ROLLBACK,ROLLUP,ROW,ROWS,SCHEMA,SCHEMAS,SECOND,SECONDS,SECURITY,SELECT,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SESSION_USER,SET,SETS,SHORT,SHOW,SINGLE,SKEWED,SMALLINT,SOME,SORT,SORTED,SOURCE,SPECIFIC,SQL,START,STATISTICS,STORED,STRATIFY,STRING,STRUCT,SUBSTR,SUBSTRING,SYNC,SYSTEM_TIME,SYSTEM_VERSION,TABLE,TABLES,TABLESAMPLE,TARGET,TBLPROPERTIES,TERMINATED,THEN,TIME,TIMEDIFF,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TO,TOUCH,TRAILING,TRANSACTION,TRANSACTIONS,TRANSFORM,TRIM,TRUE,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNION,UNIQUE,UNKNOWN,UNLOCK,UNPIVOT,UNSET,UNTIL,UPDATE,USE,USER,USING,VALUES,VAR,VARCHAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WEEK,WEEKS,WHEN,WHERE,WHILE,WINDOW,WITH,WITHIN,X,YEAR,YEARS,ZONE") + assert(infoValue.getStringValue == "ADD,AFTER,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,AUTHORIZATION,BEGIN,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLATIONS,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONSTRAINT,CONTAINS,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,DATA,DATABASE,DATABASES,DATE,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAY,DAYOFYEAR,DAYS,DBPROPERTIES,DEC,DECIMAL,DECLARE,DEFAULT,DEFINED,DEFINER,DELETE,DELIMITED,DESC,DESCRIBE,DETERMINISTIC,DFS,DIRECTORIES,DIRECTORY,DISTINCT,DISTRIBUTE,DIV,DO,DOUBLE,DROP,ELSE,END,ESCAPE,ESCAPED,EVOLUTION,EXCEPT,EXCHANGE,EXCLUDE,EXECUTE,EXISTS,EXPLAIN,EXPORT,EXTENDED,EXTERNAL,EXTRACT,FALSE,FETCH,FIELDS,FILEFORMAT,FILTER,FIRST,FLOAT,FOLLOWING,FOR,FOREIGN,FORMAT,FORMATTED,FROM,FULL,FUNCTION,FUNCTIONS,GENERATED,GLOBAL,GRANT,GROUP,GROUPING,HAVING,HOUR,HOURS,IDENTIFIER,IDENTITY,IF,IGNORE,ILIKE,IMMEDIATE,IMPORT,IN,INCLUDE,INCREMENT,INDEX,INDEXES,INNER,INPATH,INPUT,INPUTFORMAT,INSERT,INT,INTEGER,INTERSECT,INTERVAL,INTO,INVOKER,IS,ITEMS,ITERATE,JOIN,KEYS,LANGUAGE,LAST,LATERAL,LAZY,LEADING,LEAVE,LEFT,LIKE,LIMIT,LINES,LIST,LOAD,LOCAL,LOCATION,LOCK,LOCKS,LOGICAL,LONG,MACRO,MAP,MATCHED,MERGE,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTE,MINUTES,MODIFIES,MONTH,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NATURAL,NO,NONE,NOT,NULL,NULLS,NUMERIC,OF,OFFSET,ON,ONLY,OPTION,OPTIONS,OR,ORDER,OUT,OUTER,OUTPUTFORMAT,OVER,OVERLAPS,OVERLAY,OVERWRITE,PARTITION,PARTITIONED,PARTITIONS,PERCENT,PIVOT,PLACING,POSITION,PRECEDING,PRIMARY,PRINCIPALS,PROPERTIES,PURGE,QUARTER,QUERY,RANGE,READS,REAL,RECORDREADER,RECORDWRITER,RECOVER,REDUCE,REFERENCES,REFRESH,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,RETURN,RETURNS,REVOKE,RIGHT,ROLE,ROLES,ROLLBACK,ROLLUP,ROW,ROWS,SCHEMA,SCHEMAS,SECOND,SECONDS,SECURITY,SELECT,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SESSION_USER,SET,SETS,SHORT,SHOW,SINGLE,SKEWED,SMALLINT,SOME,SORT,SORTED,SOURCE,SPECIFIC,SQL,START,STATISTICS,STORED,STRATIFY,STRING,STRUCT,SUBSTR,SUBSTRING,SYNC,SYSTEM_TIME,SYSTEM_VERSION,TABLE,TABLES,TABLESAMPLE,TARGET,TBLPROPERTIES,TERMINATED,THEN,TIME,TIMEDIFF,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TO,TOUCH,TRAILING,TRANSACTION,TRANSACTIONS,TRANSFORM,TRIM,TRUE,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNION,UNIQUE,UNKNOWN,UNLOCK,UNPIVOT,UNSET,UNTIL,UPDATE,USE,USER,USING,VALUES,VAR,VARCHAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WEEK,WEEKS,WHEN,WHERE,WHILE,WINDOW,WITH,WITHIN,X,YEAR,YEARS,ZONE") // scalastyle:on line.size.limit } }