diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala index 18f656748ac17..841017ae6c000 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.{caseSensitiveResolution, Analyzer import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.connect.planner.SparkConnectPlanner import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, InMemoryCatalog} +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -67,16 +68,17 @@ class ProtoToParsedPlanTestSuite extends SparkFunSuite with SharedSparkSession { protected val inputFilePath: Path = baseResourcePath.resolve("queries") protected val goldenFilePath: Path = baseResourcePath.resolve("explain-results") + private val emptyProps: util.Map[String, String] = util.Collections.emptyMap() private val analyzer = { val inMemoryCatalog = new InMemoryCatalog inMemoryCatalog.initialize("primary", CaseInsensitiveStringMap.empty()) - inMemoryCatalog.createNamespace(Array("tempdb"), util.Collections.emptyMap()) + inMemoryCatalog.createNamespace(Array("tempdb"), emptyProps) inMemoryCatalog.createTable( Identifier.of(Array("tempdb"), "myTable"), new StructType().add("id", "long"), - Array.empty, - util.Collections.emptyMap()) + Array.empty[Transform], + emptyProps) val catalogManager = new CatalogManager( inMemoryCatalog, diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala index b2500a2dbf29a..d3f17187a3754 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala @@ -27,6 +27,7 @@ import org.apache.logging.log4j.Level import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange} +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite import org.apache.spark.sql.test.SharedSparkSession @@ -118,7 +119,7 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte // Drop non empty namespace without cascade catalog.createNamespace(Array("foo"), commentMap.asJava) assert(catalog.namespaceExists(Array("foo")) === true) - catalog.createTable(ident1, schema, Array.empty, emptyProps) + catalog.createTable(ident1, schema, Array.empty[Transform], emptyProps) if (supportsDropSchemaRestrict) { intercept[NonEmptyNamespaceException] { catalog.dropNamespace(Array("foo"), cascade = false) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java new file mode 100644 index 0000000000000..d2c8f25e73904 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Map; +import javax.annotation.Nullable; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.internal.connector.ColumnImpl; +import org.apache.spark.sql.types.DataType; + +/** + * An interface representing a column of a {@link Table}. It defines basic properties of a column, + * such as name and data type, as well as some advanced ones like default column value. + *

+ * Data Sources do not need to implement it. They should consume it in APIs like + * {@link TableCatalog#createTable(Identifier, Column[], Transform[], Map)}, and report it in + * {@link Table#columns()} by calling the static {@code create} functions of this interface to + * create it. + */ +@Evolving +public interface Column { + + static Column create(String name, DataType dataType) { + return create(name, dataType, true); + } + + static Column create(String name, DataType dataType, boolean nullable) { + return create(name, dataType, nullable, null, null, null); + } + + static Column create( + String name, + DataType dataType, + boolean nullable, + String comment, + ColumnDefaultValue defaultValue, + String metadataInJSON) { + return new ColumnImpl(name, dataType, nullable, comment, defaultValue, metadataInJSON); + } + + /** + * Returns the name of this table column. + */ + String name(); + + /** + * Returns the data type of this table column. + */ + DataType dataType(); + + /** + * Returns true if this column may produce null values. + */ + boolean nullable(); + + /** + * Returns the comment of this table column. Null means no comment. + */ + @Nullable + String comment(); + + /** + * Returns the default value of this table column. Null means no default value. + */ + @Nullable + ColumnDefaultValue defaultValue(); + + /** + * Returns the column metadata in JSON format. + */ + @Nullable + String metadataInJSON(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ColumnDefaultValue.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ColumnDefaultValue.java new file mode 100644 index 0000000000000..b8e75c11c813a --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ColumnDefaultValue.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Objects; +import javax.annotation.Nonnull; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Literal; + +/** + * A class representing the default value of a column. It contains both the SQL string and literal + * value of the user-specified default value expression. The SQL string should be re-evaluated for + * each table writing command, which may produce different values if the default value expression is + * something like {@code CURRENT_DATE()}. The literal value is used to back-fill existing data if + * new columns with default value are added. Note: the back-fill can be lazy. The data sources can + * remember the column default value and let the reader fill the column value when reading existing + * data that do not have these new columns. + */ +@Evolving +public class ColumnDefaultValue { + private String sql; + private Literal value; + + public ColumnDefaultValue(String sql, Literal value) { + this.sql = sql; + this.value = value; + } + + /** + * Returns the SQL string (Spark SQL dialect) of the default value expression. This is the + * original string contents of the SQL expression specified at the time the column was created in + * a CREATE TABLE, REPLACE TABLE, or ADD COLUMN command. For example, for + * "CREATE TABLE t (col INT DEFAULT 40 + 2)", this returns the string literal "40 + 2" (without + * quotation marks). + */ + @Nonnull + public String getSql() { + return sql; + } + + /** + * Returns the default value literal. This is the literal value corresponding to + * {@link #getSql()}. For the example in the doc of {@link #getSql()}, this returns a literal + * integer with a value of 42. + */ + @Nonnull + public Literal getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ColumnDefaultValue)) return false; + ColumnDefaultValue that = (ColumnDefaultValue) o; + return sql.equals(that.sql) && value.equals(that.value); + } + + @Override + public int hashCode() { + return Objects.hash(sql, value); + } + + @Override + public String toString() { + return "ColumnDefaultValue{sql='" + sql + "\', value=" + value + '}'; + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java index 35455a0ed9975..4337a7c615208 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java @@ -54,6 +54,19 @@ @Evolving public interface StagingTableCatalog extends TableCatalog { + /** + * Stage the creation of a table, preparing it to be committed into the metastore. + *

+ * This is deprecated. Please override + * {@link #stageCreate(Identifier, Column[], Transform[], Map)} instead. + */ + @Deprecated + StagedTable stageCreate( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException; + /** * Stage the creation of a table, preparing it to be committed into the metastore. *

@@ -64,7 +77,7 @@ public interface StagingTableCatalog extends TableCatalog { * committed, an exception should be thrown by {@link StagedTable#commitStagedChanges()}. * * @param ident a table identifier - * @param schema the schema of the new table, as a struct type + * @param columns the column of the new table * @param partitions transforms to use for partitioning data in the table * @param properties a string map of table properties * @return metadata for the new table @@ -72,11 +85,26 @@ public interface StagingTableCatalog extends TableCatalog { * @throws UnsupportedOperationException If a requested partition transform is not supported * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) */ - StagedTable stageCreate( + default StagedTable stageCreate( + Identifier ident, + Column[] columns, + Transform[] partitions, + Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { + return stageCreate(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties); + } + + /** + * Stage the replacement of a table, preparing it to be committed into the metastore when the + * returned table's {@link StagedTable#commitStagedChanges()} is called. + *

+ * This is deprecated, please override + * {@link #stageReplace(Identifier, StructType, Transform[], Map)} instead. + */ + StagedTable stageReplace( Identifier ident, StructType schema, Transform[] partitions, - Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException; + Map properties) throws NoSuchNamespaceException, NoSuchTableException; /** * Stage the replacement of a table, preparing it to be committed into the metastore when the @@ -97,7 +125,7 @@ StagedTable stageCreate( * operation. * * @param ident a table identifier - * @param schema the schema of the new table, as a struct type + * @param columns the columns of the new table * @param partitions transforms to use for partitioning data in the table * @param properties a string map of table properties * @return metadata for the new table @@ -105,11 +133,27 @@ StagedTable stageCreate( * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) * @throws NoSuchTableException If the table does not exist */ - StagedTable stageReplace( + default StagedTable stageReplace( + Identifier ident, + Column[] columns, + Transform[] partitions, + Map properties) throws NoSuchNamespaceException, NoSuchTableException { + return stageReplace( + ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties); + } + + /** + * Stage the creation or replacement of a table, preparing it to be committed into the metastore + * when the returned table's {@link StagedTable#commitStagedChanges()} is called. + *

+ * This is deprecated, please override + * {@link #stageCreateOrReplace(Identifier, Column[], Transform[], Map)} instead. + */ + StagedTable stageCreateOrReplace( Identifier ident, StructType schema, Transform[] partitions, - Map properties) throws NoSuchNamespaceException, NoSuchTableException; + Map properties) throws NoSuchNamespaceException; /** * Stage the creation or replacement of a table, preparing it to be committed into the metastore @@ -129,16 +173,19 @@ StagedTable stageReplace( * the staged changes are committed but the table doesn't exist at commit time. * * @param ident a table identifier - * @param schema the schema of the new table, as a struct type + * @param columns the columns of the new table * @param partitions transforms to use for partitioning data in the table * @param properties a string map of table properties * @return metadata for the new table * @throws UnsupportedOperationException If a requested partition transform is not supported * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) */ - StagedTable stageCreateOrReplace( + default StagedTable stageCreateOrReplace( Identifier ident, - StructType schema, + Column[] columns, Transform[] partitions, - Map properties) throws NoSuchNamespaceException; + Map properties) throws NoSuchNamespaceException { + return stageCreateOrReplace( + ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties); + } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java index 8f7a87404837c..b9753a08aba96 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java @@ -51,9 +51,20 @@ public interface Table { /** * Returns the schema of this table. If the table is not readable and doesn't have a schema, an * empty schema can be returned here. + *

+ * This is deprecated. Please override {@link #columns} instead. */ + @Deprecated StructType schema(); + /** + * Returns the columns of this table. If the table is not readable and doesn't have a schema, an + * empty array can be returned here. + */ + default Column[] columns() { + return CatalogV2Util.structTypeToV2Columns(schema()); + } + /** * Returns the physical partitioning of this table. */ diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index b04c7e55138e1..82622d65205ec 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -159,11 +159,24 @@ default boolean tableExists(Identifier ident) { } } + /** + * Create a table in the catalog. + *

+ * This is deprecated. Please override + * {@link #createTable(Identifier, Column[], Transform[], Map)} instead. + */ + @Deprecated + Table createTable( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException; + /** * Create a table in the catalog. * * @param ident a table identifier - * @param schema the schema of the new table, as a struct type + * @param columns the columns of the new table. * @param partitions transforms to use for partitioning data in the table * @param properties a string map of table properties * @return metadata for the new table @@ -171,11 +184,13 @@ default boolean tableExists(Identifier ident) { * @throws UnsupportedOperationException If a requested partition transform is not supported * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) */ - Table createTable( + default Table createTable( Identifier ident, - StructType schema, + Column[] columns, Transform[] partitions, - Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException; + Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { + return createTable(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties); + } /** * Apply a set of {@link TableChange changes} to a table in the catalog. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java index cf735ed94521c..609cfab2d568e 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java @@ -140,7 +140,7 @@ static TableChange addColumn( boolean isNullable, String comment, ColumnPosition position, - String defaultValue) { + ColumnDefaultValue defaultValue) { return new AddColumn(fieldNames, dataType, isNullable, comment, position, defaultValue); } @@ -228,7 +228,7 @@ static TableChange updateColumnPosition(String[] fieldNames, ColumnPosition newP * If the field does not exist, the change will result in an {@link IllegalArgumentException}. * * @param fieldNames field names of the column to update - * @param newDefaultValue the new default value + * @param newDefaultValue the new default value SQL string (Spark SQL dialect). * @return a TableChange for the update */ static TableChange updateColumnDefaultValue(String[] fieldNames, String newDefaultValue) { @@ -383,7 +383,9 @@ interface ColumnChange extends TableChange { } /** - * A TableChange to add a field. + * A TableChange to add a field. The implementation may need to back-fill all the existing data + * to add this new column, or remember the column default value specified here and let the reader + * fill the column value when reading existing data that do not have this new column. *

* If the field already exists, the change must result in an {@link IllegalArgumentException}. * If the new field is nested and its parent does not exist or is not a struct, the change must @@ -395,7 +397,7 @@ final class AddColumn implements ColumnChange { private final boolean isNullable; private final String comment; private final ColumnPosition position; - private final String defaultValue; + private final ColumnDefaultValue defaultValue; private AddColumn( String[] fieldNames, @@ -403,7 +405,7 @@ private AddColumn( boolean isNullable, String comment, ColumnPosition position, - String defaultValue) { + ColumnDefaultValue defaultValue) { this.fieldNames = fieldNames; this.dataType = dataType; this.isNullable = isNullable; @@ -436,7 +438,7 @@ public ColumnPosition position() { } @Nullable - public String defaultValue() { return defaultValue; } + public ColumnDefaultValue defaultValue() { return defaultValue; } @Override public boolean equals(Object o) { @@ -691,6 +693,12 @@ public String[] fieldNames() { return fieldNames; } + /** + * Returns the column default value SQL string (Spark SQL dialect). The default value literal + * is not provided as updating column default values does not need to back-fill existing data. + * Null means dropping the column default value. + */ + @Nullable public String newDefaultValue() { return newDefaultValue; } @Override diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 46cc0b0fbf0fc..d7cc34d6f15bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1249,7 +1249,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor } SubqueryAlias( catalog.name +: ident.asMultipartIdentifier, - StreamingRelationV2(None, table.name, table, options, table.schema.toAttributes, + StreamingRelationV2(None, table.name, table, options, table.columns.toAttributes, Some(catalog), Some(ident), v1Fallback)) } else { SubqueryAlias( @@ -3722,7 +3722,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor colsToAdd(resolvedParentName) = fieldsAdded :+ col.colName resolvedPosition } - val schema = r.table.schema + val schema = r.table.columns.asSchema val resolvedCols = cols.map { col => col.path match { case Some(parent: UnresolvedFieldName) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index e6be5c2395551..2d26e281607ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -181,7 +181,7 @@ object ResolvedTable { catalog: TableCatalog, identifier: Identifier, table: Table): ResolvedTable = { - val schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.schema) + val schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.columns.asSchema) ResolvedTable(catalog, identifier, table, schema.toAttributes) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index af70f07bc8792..9c639a4bce69f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -20,6 +20,9 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.trees.{LeafLike, UnaryLike} +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns +import org.apache.spark.sql.connector.catalog.ColumnDefaultValue +import org.apache.spark.sql.connector.expressions.LiteralValue import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types.DataType @@ -135,6 +138,16 @@ case class QualifiedColType( def name: Seq[String] = path.map(_.name).getOrElse(Nil) :+ colName def resolved: Boolean = path.forall(_.resolved) && position.forall(_.resolved) + + def getV2Default: ColumnDefaultValue = { + default.map { sql => + val e = ResolveDefaultColumns.analyze(colName, dataType, sql, "ALTER TABLE") + assert(e.resolved && e.foldable, + "The existence default value must be a simple SQL string that is resolved and foldable, " + + "but got: " + sql) + new ColumnDefaultValue(sql, LiteralValue(e.eval(), dataType)) + }.orNull + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala index 94f2a57066356..eb9d45f06ec79 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala @@ -120,7 +120,7 @@ case class AddColumns( col.nullable, col.comment.orNull, col.position.map(_.position).orNull, - col.default.orNull) + col.getV2Default) } } @@ -156,7 +156,7 @@ case class ReplaceColumns( col.nullable, col.comment.orNull, null, - col.default.orNull) + col.getV2Default) } deleteChanges ++ addChanges } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala index 667c0988d0cc3..be7d74b078232 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala @@ -151,16 +151,28 @@ object ResolveDefaultColumns { field: StructField, statementType: String, metadataKey: String = CURRENT_DEFAULT_COLUMN_METADATA_KEY): Expression = { + analyze(field.name, field.dataType, field.metadata.getString(metadataKey), statementType) + } + + /** + * Parses and analyzes the DEFAULT column SQL string, returning an error upon failure. + * + * @return Result of the analysis and constant-folding operation. + */ + def analyze( + colName: String, + dataType: DataType, + defaultSQL: String, + statementType: String): Expression = { // Parse the expression. - val colText: String = field.metadata.getString(metadataKey) lazy val parser = new CatalystSqlParser() val parsed: Expression = try { - parser.parseExpression(colText) + parser.parseExpression(defaultSQL) } catch { case ex: ParseException => throw new AnalysisException( s"Failed to execute $statementType command because the destination table column " + - s"${field.name} has a DEFAULT value of $colText which fails to parse as a valid " + + s"$colName has a DEFAULT value of $defaultSQL which fails to parse as a valid " + s"expression: ${ex.getMessage}") } // Check invariants before moving on to analysis. @@ -170,28 +182,28 @@ object ResolveDefaultColumns { // Analyze the parse result. val plan = try { val analyzer: Analyzer = DefaultColumnAnalyzer - val analyzed = analyzer.execute(Project(Seq(Alias(parsed, field.name)()), OneRowRelation())) + val analyzed = analyzer.execute(Project(Seq(Alias(parsed, colName)()), OneRowRelation())) analyzer.checkAnalysis(analyzed) ConstantFolding(analyzed) } catch { case ex: AnalysisException => throw new AnalysisException( s"Failed to execute $statementType command because the destination table column " + - s"${field.name} has a DEFAULT value of $colText which fails to resolve as a valid " + + s"$colName has a DEFAULT value of $defaultSQL which fails to resolve as a valid " + s"expression: ${ex.getMessage}") } val analyzed: Expression = plan.collectFirst { case Project(Seq(a: Alias), OneRowRelation()) => a.child }.get // Perform implicit coercion from the provided expression type to the required column type. - if (field.dataType == analyzed.dataType) { + if (dataType == analyzed.dataType) { analyzed - } else if (Cast.canUpCast(analyzed.dataType, field.dataType)) { - Cast(analyzed, field.dataType) + } else if (Cast.canUpCast(analyzed.dataType, dataType)) { + Cast(analyzed, dataType) } else { throw new AnalysisException( s"Failed to execute $statementType command because the destination table column " + - s"${field.name} has a DEFAULT value with type ${field.dataType}, but the " + + s"$colName has a DEFAULT value with type $dataType, but the " + s"statement provided a value of incompatible type ${analyzed.dataType}") } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala index 0c9282f9675fa..12858887bb5b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala @@ -21,10 +21,12 @@ import scala.collection.mutable import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.quoteIfNeeded import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, LogicalExpressions, Transform} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.types.StructType /** * Conversion helpers for working with v2 [[CatalogPlugin]]. @@ -183,6 +185,11 @@ private[sql] object CatalogV2Implicits { } } + implicit class ColumnsHelper(columns: Array[Column]) { + def asSchema: StructType = CatalogV2Util.v2ColumnsToStructType(columns) + def toAttributes: Seq[AttributeReference] = asSchema.toAttributes + } + def parseColumnPath(name: String): Seq[String] = { CatalystSqlParser.parseMultipartIdentifier(name) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 72c557c8d7726..9b481356fa603 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -23,12 +23,14 @@ import java.util.Collections import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec} +import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction +import org.apache.spark.sql.connector.expressions.LiteralValue import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, MapType, Metadata, MetadataBuilder, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils @@ -142,8 +144,7 @@ private[sql] object CatalogV2Util { add.fieldNames match { case Array(name) => val field = StructField(name, add.dataType, nullable = add.isNullable) - val fieldWithDefault: StructField = - Option(add.defaultValue).map(field.withCurrentDefaultValue).getOrElse(field) + val fieldWithDefault: StructField = encodeDefaultValue(add.defaultValue(), field) val fieldWithComment: StructField = Option(add.comment).map(fieldWithDefault.withComment).getOrElse(fieldWithDefault) addField(schema, fieldWithComment, add.position(), tableProvider, statementType, true) @@ -151,8 +152,7 @@ private[sql] object CatalogV2Util { replace(schema, names.init, parent => parent.dataType match { case parentType: StructType => val field = StructField(names.last, add.dataType, nullable = add.isNullable) - val fieldWithDefault: StructField = - Option(add.defaultValue).map(field.withCurrentDefaultValue).getOrElse(field) + val fieldWithDefault: StructField = encodeDefaultValue(add.defaultValue(), field) val fieldWithComment: StructField = Option(add.comment).map(fieldWithDefault.withComment) .getOrElse(fieldWithDefault) @@ -431,4 +431,83 @@ private[sql] object CatalogV2Util { .getOrElse(catalogManager.v2SessionCatalog) .asTableCatalog } + + /** + * Converts DS v2 columns to StructType, which encodes column comment and default value to + * StructField metadata. This is mainly used to define the schema of v2 scan, w.r.t. the columns + * of the v2 table. + */ + def v2ColumnsToStructType(columns: Array[Column]): StructType = { + StructType(columns.map(v2ColumnToStructField)) + } + + private def v2ColumnToStructField(col: Column): StructField = { + val metadata = Option(col.metadataInJSON()).map(Metadata.fromJson).getOrElse(Metadata.empty) + var f = StructField(col.name(), col.dataType(), col.nullable(), metadata) + Option(col.comment()).foreach { comment => + f = f.withComment(comment) + } + Option(col.defaultValue()).foreach { default => + f = encodeDefaultValue(default, f) + } + f + } + + // For built-in file sources, we encode the default value in StructField metadata. An analyzer + // rule will check the special metadata and change the DML input plan to fill the default value. + private def encodeDefaultValue(defaultValue: ColumnDefaultValue, f: StructField): StructField = { + Option(defaultValue).map { default => + // The "exist default" is used to back-fill the existing data when new columns are added, and + // should be a fixed value which was evaluated at the definition time. For example, if the + // default value is `current_date()`, the "exist default" should be the value of + // `current_date()` when the column was defined/altered, instead of when back-fall happens. + // Note: the back-fill here is a logical concept. The data source can keep the existing + // data unchanged and let the data reader to return "exist default" for missing + // columns. + val existingDefault = Literal(default.getValue.value(), default.getValue.dataType()).sql + f.withExistenceDefaultValue(existingDefault).withCurrentDefaultValue(default.getSql) + }.getOrElse(f) + } + + /** + * Converts a StructType to DS v2 columns, which decodes the StructField metadata to v2 column + * comment and default value. This is mainly used to generate DS v2 columns from table schema in + * DDL commands, so that Spark can pass DS v2 columns to DS v2 createTable and related APIs. + */ + def structTypeToV2Columns(schema: StructType): Array[Column] = { + schema.fields.map(structFieldToV2Column) + } + + private def structFieldToV2Column(f: StructField): Column = { + def createV2Column(defaultValue: ColumnDefaultValue, metadata: Metadata): Column = { + val metadataJSON = if (metadata == Metadata.empty) { + null + } else { + metadata.json + } + Column.create( + f.name, f.dataType, f.nullable, f.getComment().orNull, defaultValue, metadataJSON) + } + if (f.getCurrentDefaultValue().isDefined && f.getExistenceDefaultValue().isDefined) { + val e = analyze(f, EXISTS_DEFAULT_COLUMN_METADATA_KEY) + assert(e.resolved && e.foldable, + "The existence default value must be a simple SQL string that is resolved and foldable, " + + "but got: " + f.getExistenceDefaultValue().get) + val defaultValue = new ColumnDefaultValue( + f.getCurrentDefaultValue().get, LiteralValue(e.eval(), f.dataType)) + val cleanedMetadata = new MetadataBuilder() + .withMetadata(f.metadata) + .remove("comment") + .remove(CURRENT_DEFAULT_COLUMN_METADATA_KEY) + .remove(EXISTS_DEFAULT_COLUMN_METADATA_KEY) + .build() + createV2Column(defaultValue, cleanedMetadata) + } else { + val cleanedMetadata = new MetadataBuilder() + .withMetadata(f.metadata) + .remove("comment") + .build() + createV2Column(null, cleanedMetadata) + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/RowLevelOperationTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/RowLevelOperationTable.scala index d1f7ba000c62a..07acacd9a35d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/RowLevelOperationTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/RowLevelOperationTable.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.connector.write import java.util -import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsRowLevelOperations, SupportsWrite, Table, TableCapability} +import org.apache.spark.sql.connector.catalog.{Column, SupportsRead, SupportsRowLevelOperations, SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -38,6 +38,7 @@ private[sql] case class RowLevelOperationTable( override def name: String = table.name override def schema: StructType = table.schema + override def columns: Array[Column] = table.columns() override def capabilities: util.Set[TableCapability] = table.capabilities override def toString: String = table.toString diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 51ef3dda8171e..c170b7ae672b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -189,9 +189,10 @@ object DataSourceV2Relation { catalog: Option[CatalogPlugin], identifier: Option[Identifier], options: CaseInsensitiveStringMap): DataSourceV2Relation = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ // The v2 source may return schema containing char/varchar type. We replace char/varchar // with "annotated" string type here as the query engine doesn't support char/varchar yet. - val schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.schema) + val schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.columns.asSchema) DataSourceV2Relation(table, schema.toAttributes, catalog, identifier, options) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala new file mode 100644 index 0000000000000..5ab3f83eeae56 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.internal.connector + +import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue} +import org.apache.spark.sql.types.DataType + +// The standard concrete implementation of data source V2 column. +case class ColumnImpl( + name: String, + dataType: DataType, + nullable: Boolean, + comment: String, + defaultValue: ColumnDefaultValue, + metadataInJSON: String) extends Column diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala index 7bfe1df1117ac..f8b237195fa88 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/SimpleTableProvider.scala @@ -37,7 +37,8 @@ trait SimpleTableProvider extends TableProvider { } override def inferSchema(options: CaseInsensitiveStringMap): StructType = { - getOrLoadTable(options).schema() + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + getOrLoadTable(options).columns.asSchema } override def getTable( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala index 032b04bb887de..6be50f36c848a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} -import org.apache.spark.sql.connector.expressions.LogicalExpressions +import org.apache.spark.sql.connector.expressions.{LogicalExpressions, Transform} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -37,6 +37,7 @@ class CatalogSuite extends SparkFunSuite { import CatalogV2Implicits._ private val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String] + private val emptyTrans: Array[Transform] = Array.empty private val schema: StructType = new StructType() .add("id", IntegerType) .add("data", StringType) @@ -74,13 +75,13 @@ class CatalogSuite extends SparkFunSuite { intercept[NoSuchNamespaceException](catalog.listTables(Array("ns"))) - catalog.createTable(ident1, schema, Array.empty, emptyProps) + catalog.createTable(ident1, schema, emptyTrans, emptyProps) assert(catalog.listTables(Array("ns")).toSet == Set(ident1)) intercept[NoSuchNamespaceException](catalog.listTables(Array("ns2"))) - catalog.createTable(ident3, schema, Array.empty, emptyProps) - catalog.createTable(ident2, schema, Array.empty, emptyProps) + catalog.createTable(ident3, schema, emptyTrans, emptyProps) + catalog.createTable(ident2, schema, emptyTrans, emptyProps) assert(catalog.listTables(Array("ns")).toSet == Set(ident1, ident2)) assert(catalog.listTables(Array("ns2")).toSet == Set(ident3)) @@ -100,7 +101,7 @@ class CatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(testIdent)) - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) assert(parsed == Seq("test", "`", ".", "test_table")) @@ -118,7 +119,7 @@ class CatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(testIdent)) - val table = catalog.createTable(testIdent, schema, Array.empty, properties) + val table = catalog.createTable(testIdent, schema, emptyTrans, properties) val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) assert(parsed == Seq("test", "`", ".", "test_table")) @@ -133,10 +134,10 @@ class CatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(testIdent)) - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) val exc = intercept[TableAlreadyExistsException] { - catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) } checkErrorTableAlreadyExists(exc, testIdentQuoted) @@ -149,7 +150,7 @@ class CatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(testIdent)) - catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(catalog.tableExists(testIdent)) @@ -161,7 +162,7 @@ class CatalogSuite extends SparkFunSuite { test("loadTable") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) val loaded = catalog.loadTable(testIdent) assert(table.name == loaded.name) @@ -182,7 +183,7 @@ class CatalogSuite extends SparkFunSuite { test("invalidateTable") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) catalog.invalidateTable(testIdent) val loaded = catalog.loadTable(testIdent) @@ -203,7 +204,7 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: add property") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.properties.asScala == Map()) @@ -222,7 +223,7 @@ class CatalogSuite extends SparkFunSuite { val properties = new util.HashMap[String, String]() properties.put("prop-1", "1") - val table = catalog.createTable(testIdent, schema, Array.empty, properties) + val table = catalog.createTable(testIdent, schema, emptyTrans, properties) assert(table.properties.asScala == Map("prop-1" -> "1")) @@ -241,7 +242,7 @@ class CatalogSuite extends SparkFunSuite { val properties = new util.HashMap[String, String]() properties.put("prop-1", "1") - val table = catalog.createTable(testIdent, schema, Array.empty, properties) + val table = catalog.createTable(testIdent, schema, emptyTrans, properties) assert(table.properties.asScala == Map("prop-1" -> "1")) @@ -257,7 +258,7 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: remove missing property") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.properties.asScala == Map()) @@ -273,7 +274,7 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: add top-level column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -285,7 +286,7 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: add required column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -298,7 +299,7 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: add column with comment") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -315,7 +316,7 @@ class CatalogSuite extends SparkFunSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) assert(table.schema == tableSchema) @@ -330,7 +331,7 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: add column to primitive field fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -348,7 +349,7 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: add field to missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -364,7 +365,7 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: update column data type") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -380,7 +381,7 @@ class CatalogSuite extends SparkFunSuite { val originalSchema = new StructType() .add("id", IntegerType, nullable = false) .add("data", StringType) - val table = catalog.createTable(testIdent, originalSchema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, originalSchema, emptyTrans, emptyProps) assert(table.schema == originalSchema) @@ -394,7 +395,7 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: update missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -410,7 +411,7 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: add comment") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -426,7 +427,7 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: replace comment") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -445,7 +446,7 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: add comment to missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -461,7 +462,7 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: rename top-level column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -478,7 +479,7 @@ class CatalogSuite extends SparkFunSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) assert(table.schema == tableSchema) @@ -497,7 +498,7 @@ class CatalogSuite extends SparkFunSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) assert(table.schema == tableSchema) @@ -513,7 +514,7 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: rename missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -532,7 +533,7 @@ class CatalogSuite extends SparkFunSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) assert(table.schema == tableSchema) @@ -549,7 +550,7 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: delete top-level column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -566,7 +567,7 @@ class CatalogSuite extends SparkFunSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) assert(table.schema == tableSchema) @@ -582,7 +583,7 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: delete missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -604,7 +605,7 @@ class CatalogSuite extends SparkFunSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) assert(table.schema == tableSchema) @@ -635,7 +636,7 @@ class CatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(testIdent)) - catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(catalog.tableExists(testIdent)) @@ -667,7 +668,7 @@ class CatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(testIdent)) assert(!catalog.tableExists(testIdentNew)) - catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(catalog.tableExists(testIdent)) catalog.renameTable(testIdent, testIdentNew) @@ -692,8 +693,8 @@ class CatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(testIdent)) assert(!catalog.tableExists(testIdentNew)) - catalog.createTable(testIdent, schema, Array.empty, emptyProps) - catalog.createTable(testIdentNew, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdentNew, schema, emptyTrans, emptyProps) assert(catalog.tableExists(testIdent)) assert(catalog.tableExists(testIdentNew)) @@ -719,8 +720,8 @@ class CatalogSuite extends SparkFunSuite { val ident1 = Identifier.of(Array("ns1", "ns2"), "test_table_1") val ident2 = Identifier.of(Array("ns1", "ns2"), "test_table_2") - catalog.createTable(ident1, schema, Array.empty, emptyProps) - catalog.createTable(ident2, schema, Array.empty, emptyProps) + catalog.createTable(ident1, schema, emptyTrans, emptyProps) + catalog.createTable(ident2, schema, emptyTrans, emptyProps) assert(catalog.listNamespaces === Array(Array("ns1"))) assert(catalog.listNamespaces(Array()) === Array(Array("ns1"))) @@ -734,8 +735,8 @@ class CatalogSuite extends SparkFunSuite { val ident2 = Identifier.of(Array("ns1", "ns2"), "test_table_2") catalog.createNamespace(Array("ns1"), Map("property" -> "value").asJava) - catalog.createTable(ident1, schema, Array.empty, emptyProps) - catalog.createTable(ident2, schema, Array.empty, emptyProps) + catalog.createTable(ident1, schema, emptyTrans, emptyProps) + catalog.createTable(ident2, schema, emptyTrans, emptyProps) assert(catalog.listNamespaces === Array(Array("ns1"))) assert(catalog.listNamespaces(Array()) === Array(Array("ns1"))) @@ -756,7 +757,7 @@ class CatalogSuite extends SparkFunSuite { test("loadNamespaceMetadata: no metadata, table exists") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) val metadata = catalog.loadNamespaceMetadata(testNs) @@ -777,7 +778,7 @@ class CatalogSuite extends SparkFunSuite { val catalog = newCatalog() catalog.createNamespace(testNs, Map("property" -> "value").asJava) - catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) val metadata = catalog.loadNamespaceMetadata(testNs) @@ -810,7 +811,7 @@ class CatalogSuite extends SparkFunSuite { test("createNamespace: fail if namespace already exists from table") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(catalog.namespaceExists(testNs) === true) assert(catalog.loadNamespaceMetadata(testNs).asScala === Map.empty) @@ -852,7 +853,7 @@ class CatalogSuite extends SparkFunSuite { val catalog = newCatalog() catalog.createNamespace(testNs, Map("property" -> "value").asJava) - catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(catalog.dropNamespace(testNs, cascade = true)) @@ -882,7 +883,7 @@ class CatalogSuite extends SparkFunSuite { test("alterNamespace: create metadata if missing and table exists") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) catalog.alterNamespace(testNs, NamespaceChange.setProperty("property", "value")) @@ -902,7 +903,7 @@ class CatalogSuite extends SparkFunSuite { test("truncate non-partitioned table") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) .asInstanceOf[InMemoryTable] table.withData(Array( new BufferedRows("3").withRow(InternalRow(0, "abc", "3")), @@ -920,7 +921,7 @@ class CatalogSuite extends SparkFunSuite { new StructType() .add("col0", IntegerType) .add("part0", IntegerType), - Array(LogicalExpressions.identity(LogicalExpressions.parseReference("part0"))), + Array[Transform](LogicalExpressions.identity(LogicalExpressions.parseReference("part0"))), util.Collections.emptyMap[String, String]) val partTable = table.asInstanceOf[InMemoryPartitionTable] val partIdent = InternalRow.apply(0) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogV2UtilSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogV2UtilSuite.scala index da5cfab8be3c7..eda401ceb6bdf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogV2UtilSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogV2UtilSuite.scala @@ -21,14 +21,14 @@ import org.mockito.Mockito.{mock, when} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.IntegerType class CatalogV2UtilSuite extends SparkFunSuite { test("Load relation should encode the identifiers for V2Relations") { val testCatalog = mock(classOf[TableCatalog]) val ident = mock(classOf[Identifier]) val table = mock(classOf[Table]) - when(table.schema()).thenReturn(new StructType().add("i", "int")) + when(table.columns()).thenReturn(Array(Column.create("i", IntegerType))) when(testCatalog.loadTable(ident)).thenReturn(table) val r = CatalogV2Util.loadRelation(testCatalog, ident) assert(r.isDefined) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index 06ee588329cee..50bea2b8d2f27 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -84,6 +84,7 @@ class BasicInMemoryTableCatalog extends TableCatalog { invalidatedTables.add(ident) } + // TODO: remove it when no tests calling this deprecated method. override def createTable( ident: Identifier, schema: StructType, @@ -93,6 +94,15 @@ class BasicInMemoryTableCatalog extends TableCatalog { Array.empty, None) } + override def createTable( + ident: Identifier, + columns: Array[Column], + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + val schema = CatalogV2Util.v2ColumnsToStructType(columns) + createTable(ident, schema, partitions, properties) + } + def createTable( ident: Identifier, schema: StructType, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala index 0590ca721cc8d..90ed106d8ed1b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala @@ -22,7 +22,7 @@ import java.util import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException} -import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference} +import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform} import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -41,7 +41,7 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite { .add("id", IntegerType) .add("data", StringType) .add("dt", StringType), - Array(LogicalExpressions.identity(ref("dt"))), + Array[Transform](LogicalExpressions.identity(ref("dt"))), util.Collections.emptyMap[String, String]) newCatalog } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala index ddd08185527e8..40114d063aada 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException} -import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference} +import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform} import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -43,7 +43,7 @@ class SupportsPartitionManagementSuite extends SparkFunSuite { .add("id", IntegerType) .add("data", StringType) .add("dt", StringType), - Array(LogicalExpressions.identity(ref("dt"))), + Array[Transform](LogicalExpressions.identity(ref("dt"))), util.Collections.emptyMap[String, String]) newCatalog } @@ -164,7 +164,8 @@ class SupportsPartitionManagementSuite extends SparkFunSuite { .add("col0", IntegerType) .add("part0", IntegerType) .add("part1", StringType), - Array(LogicalExpressions.identity(ref("part0")), LogicalExpressions.identity(ref("part1"))), + Array[Transform]( + LogicalExpressions.identity(ref("part0")), LogicalExpressions.identity(ref("part1"))), util.Collections.emptyMap[String, String]) val partTable = table.asInstanceOf[InMemoryPartitionTable] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 9255aa2effc5d..635562ab54d56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -245,7 +245,6 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi case create: V2CreateTablePlan if create.childrenResolved => val schema = create.tableSchema val partitioning = create.partitioning - val identifier = create.tableName val isCaseSensitive = conf.caseSensitiveAnalysis // Check that columns are not duplicated in the schema val flattenedSchema = SchemaUtils.explodeNestedFieldNames(schema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala index abc6bc60d96a3..550578443283f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala @@ -23,15 +23,14 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.TableSpec -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.types.StructType case class CreateTableExec( catalog: TableCatalog, identifier: Identifier, - tableSchema: StructType, + columns: Array[Column], partitioning: Seq[Transform], tableSpec: TableSpec, ignoreIfExists: Boolean) extends LeafV2CommandExec { @@ -42,7 +41,7 @@ case class CreateTableExec( override protected def run(): Seq[InternalRow] = { if (!catalog.tableExists(identifier)) { try { - catalog.createTable(identifier, tableSchema, partitioning.toArray, tableProperties.asJava) + catalog.createTable(identifier, columns, partitioning.toArray, tableProperties.asJava) } catch { case _: TableAlreadyExistsException if ignoreIfExists => logWarning(s"Table ${identifier.quoted} was created concurrently. Ignoring.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 757b66e1534ae..b45de06371cfe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{toPrettySQL, ResolveDefaultColumns, V2ExpressionBuilder} import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog, TruncatableTable} +import org.apache.spark.sql.connector.catalog.CatalogV2Util.structTypeToV2Columns import org.apache.spark.sql.connector.catalog.index.SupportsIndex import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue} import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not => V2Not, Or => V2Or, Predicate} @@ -177,7 +178,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat val newSchema: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults( schema, tableSpec.provider, "CREATE TABLE", false) - CreateTableExec(catalog.asTableCatalog, ident, newSchema, + CreateTableExec(catalog.asTableCatalog, ident, structTypeToV2Columns(newSchema), partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil case CreateTableAsSelect(ResolvedIdentifier(catalog, ident), parts, query, tableSpec, @@ -200,12 +201,13 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat val newSchema: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults( schema, tableSpec.provider, "CREATE TABLE", false) + val v2Columns = structTypeToV2Columns(newSchema) catalog match { case staging: StagingTableCatalog => - AtomicReplaceTableExec(staging, ident, newSchema, parts, + AtomicReplaceTableExec(staging, ident, v2Columns, parts, qualifyLocInTableSpec(tableSpec), orCreate = orCreate, invalidateCache) :: Nil case _ => - ReplaceTableExec(catalog.asTableCatalog, ident, newSchema, parts, + ReplaceTableExec(catalog.asTableCatalog, ident, v2Columns, parts, qualifyLocInTableSpec(tableSpec), orCreate = orCreate, invalidateCache) :: Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala index 0bd25064e353c..3cb1a74417db1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala @@ -90,8 +90,9 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister { private var t: Table = null override def inferSchema(options: CaseInsensitiveStringMap): StructType = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ if (t == null) t = getTable(options) - t.schema() + t.columns.asSchema } // TODO: implement a light-weight partition inference which only looks at the path of one leaf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala index ea221980fed85..55d97577d5781 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala @@ -23,16 +23,15 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.TableSpec -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils case class ReplaceTableExec( catalog: TableCatalog, ident: Identifier, - tableSchema: StructType, + columns: Array[Column], partitioning: Seq[Transform], tableSpec: TableSpec, orCreate: Boolean, @@ -48,7 +47,7 @@ case class ReplaceTableExec( } else if (!orCreate) { throw QueryCompilationErrors.cannotReplaceMissingTableError(ident) } - catalog.createTable(ident, tableSchema, partitioning.toArray, tableProperties.asJava) + catalog.createTable(ident, columns, partitioning.toArray, tableProperties.asJava) Seq.empty } @@ -58,7 +57,7 @@ case class ReplaceTableExec( case class AtomicReplaceTableExec( catalog: StagingTableCatalog, identifier: Identifier, - tableSchema: StructType, + columns: Array[Column], partitioning: Seq[Transform], tableSpec: TableSpec, orCreate: Boolean, @@ -73,11 +72,11 @@ case class AtomicReplaceTableExec( } val staged = if (orCreate) { catalog.stageCreateOrReplace( - identifier, tableSchema, partitioning.toArray, tableProperties.asJava) + identifier, columns, partitioning.toArray, tableProperties.asJava) } else if (catalog.tableExists(identifier)) { try { catalog.stageReplace( - identifier, tableSchema, partitioning.toArray, tableProperties.asJava) + identifier, columns, partitioning.toArray, tableProperties.asJava) } catch { case e: NoSuchTableException => throw QueryCompilationErrors.cannotReplaceMissingTableError(identifier, Some(e)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala index ec40ad70b7964..5712159ddc800 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala @@ -59,7 +59,8 @@ case class ShowCreateTableExec( } private def showTableDataColumns(table: Table, builder: StringBuilder): Unit = { - val columns = CharVarcharUtils.getRawSchema(table.schema(), conf).fields.map(_.toDDL) + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val columns = CharVarcharUtils.getRawSchema(table.columns.asSchema, conf).fields.map(_.toDDL) builder ++= concatByMultiLines(columns) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index b9afe71d243ea..461e948b02979 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -26,7 +26,7 @@ import scala.collection.mutable import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog} -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange, V1Table} import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.expressions.Transform @@ -92,6 +92,15 @@ class V2SessionCatalog(catalog: SessionCatalog) catalog.refreshTable(ident.asTableIdentifier) } + override def createTable( + ident: Identifier, + columns: Array[Column], + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + createTable(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties) + } + + // TODO: remove it when no tests calling this deprecated method. override def createTable( ident: Identifier, schema: StructType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 490b708222396..c53c603ffaa08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -86,8 +86,9 @@ case class CreateTableAsSelectExec( throw QueryCompilationErrors.tableAlreadyExistsError(ident) } - val schema = CharVarcharUtils.getRawSchema(query.schema, conf).asNullable - val table = catalog.createTable(ident, schema, + val columns = CatalogV2Util.structTypeToV2Columns( + CharVarcharUtils.getRawSchema(query.schema, conf).asNullable) + val table = catalog.createTable(ident, columns, partitioning.toArray, properties.asJava) writeToTable(catalog, table, writeOptions, ident) } @@ -125,9 +126,10 @@ case class AtomicCreateTableAsSelectExec( throw QueryCompilationErrors.tableAlreadyExistsError(ident) } - val schema = CharVarcharUtils.getRawSchema(query.schema, conf).asNullable + val columns = CatalogV2Util.structTypeToV2Columns( + CharVarcharUtils.getRawSchema(query.schema, conf).asNullable) val stagedTable = catalog.stageCreate( - ident, schema, partitioning.toArray, properties.asJava) + ident, columns, partitioning.toArray, properties.asJava) writeToTable(catalog, stagedTable, writeOptions, ident) } @@ -174,9 +176,10 @@ case class ReplaceTableAsSelectExec( } else if (!orCreate) { throw QueryCompilationErrors.cannotReplaceMissingTableError(ident) } - val schema = CharVarcharUtils.getRawSchema(query.schema, conf).asNullable + val columns = CatalogV2Util.structTypeToV2Columns( + CharVarcharUtils.getRawSchema(query.schema, conf).asNullable) val table = catalog.createTable( - ident, schema, partitioning.toArray, properties.asJava) + ident, columns, partitioning.toArray, properties.asJava) writeToTable(catalog, table, writeOptions, ident) } @@ -210,18 +213,19 @@ case class AtomicReplaceTableAsSelectExec( val properties = CatalogV2Util.convertTableProperties(tableSpec) override protected def run(): Seq[InternalRow] = { - val schema = CharVarcharUtils.getRawSchema(query.schema, conf).asNullable + val columns = CatalogV2Util.structTypeToV2Columns( + CharVarcharUtils.getRawSchema(query.schema, conf).asNullable) if (catalog.tableExists(ident)) { val table = catalog.loadTable(ident) invalidateCache(catalog, table, ident) } val staged = if (orCreate) { catalog.stageCreateOrReplace( - ident, schema, partitioning.toArray, properties.asJava) + ident, columns, partitioning.toArray, properties.asJava) } else if (catalog.tableExists(ident)) { try { catalog.stageReplace( - ident, schema, partitioning.toArray, properties.asJava) + ident, columns, partitioning.toArray, properties.asJava) } catch { case e: NoSuchTableException => throw QueryCompilationErrors.cannotReplaceMissingTableError(ident, Some(e)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala index d9a3a074ce6a1..1ab88cd41d875 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala @@ -86,7 +86,10 @@ class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimest } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = () => new Scan { - override def readSchema(): StructType = schema() + override def readSchema(): StructType = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + columns.asSchema + } override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = { new TextSocketMicroBatchStream(host, port, numPartitions) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index d4621468f846e..13f7695947e7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -180,11 +180,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ table match { case _: SupportsRead if table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) => + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ Dataset.ofRows( sparkSession, StreamingRelationV2( Some(provider), source, table, dsOptions, - table.schema.toAttributes, None, None, v1Relation)) + table.columns.asSchema.toAttributes, None, None, v1Relation)) // fallback to v1 // TODO (SPARK-27483): we should move this fallback logic to an analyzer rule. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 158e1634d58c5..3678f29ab49a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog} +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf @@ -178,7 +179,7 @@ class DataSourceV2DataFrameSuite testCatalog.createTable( Identifier.of(Array(), "table_name"), new StructType().add("i", "interval"), - Array.empty, Collections.emptyMap[String, String]) + Array.empty[Transform], Collections.emptyMap[String, String]) val df = sql(s"select interval 1 millisecond as i") val v2Writer = df.writeTo("testcat.table_name") val e1 = intercept[AnalysisException](v2Writer.append()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index f9b7b168d14f3..c4dabaec8880b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -31,10 +31,11 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.ColumnStat import org.apache.spark.sql.catalyst.statsEstimation.StatsEstimationTestBase -import org.apache.spark.sql.catalyst.util.{DateTimeUtils, ResolveDefaultColumns} -import org.apache.spark.sql.connector.catalog._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.connector.catalog.{Column => ColumnV2, _} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership +import org.apache.spark.sql.connector.expressions.LiteralValue import org.apache.spark.sql.errors.QueryErrorsBase import org.apache.spark.sql.execution.FilterExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -45,7 +46,7 @@ import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode, V2_SESSION_CATALOG_IMPLEMENTATION} import org.apache.spark.sql.internal.connector.SimpleTableProvider import org.apache.spark.sql.sources.SimpleScanSource -import org.apache.spark.sql.types.{LongType, MetadataBuilder, StringType, StructField, StructType} +import org.apache.spark.sql.types.{LongType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.unsafe.types.UTF8String @@ -585,7 +586,7 @@ class DataSourceV2SQLSuiteV1Filter assert(maybeReplacedTable === table, "Table should not have changed.") } - test("ReplaceTable: Erases the table contents and changes the metadata.") { + test("ReplaceTable: Erases the table contents and changes the metadata") { spark.sql(s"CREATE TABLE testcat.table_name USING $v2Source AS SELECT id, data FROM source") val testCatalog = catalog("testcat").asTableCatalog @@ -598,14 +599,11 @@ class DataSourceV2SQLSuiteV1Filter assert(replaced.asInstanceOf[InMemoryTable].rows.isEmpty, "Replaced table should have no rows after committing.") - assert(replaced.schema().fields.length === 1, + assert(replaced.columns.length === 1, "Replaced table should have new schema.") - val actual = replaced.schema().fields(0) - val expected = StructField("id", LongType, nullable = false, - new MetadataBuilder().putString( - ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "41 + 1") - .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "CAST(42 AS BIGINT)") - .build()) + val actual = replaced.columns.head + val expected = ColumnV2.create("id", LongType, false, null, + new ColumnDefaultValue("41 + 1", LiteralValue(42L, LongType)), null) assert(actual === expected, "Replaced table should have new schema with DEFAULT column metadata.") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala index 14b951e66db36..781e0f96eaf28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala @@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders, QueryTest, Row} import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryRowLevelOperationTable, InMemoryRowLevelOperationTableCatalog} import org.apache.spark.sql.connector.expressions.LogicalExpressions._ +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.v2.{DeleteFromTableExec, ReplaceDataExec, WriteDeltaExec} @@ -564,7 +565,8 @@ abstract class DeleteFromTableSuiteBase protected def createTable(schemaString: String): Unit = { val schema = StructType.fromDDL(schemaString) - catalog.createTable(ident, schema, Array(identity(reference(Seq("dep")))), extraTableProps) + catalog.createTable( + ident, schema, Array[Transform](identity(reference(Seq("dep")))), extraTableProps) } protected def createAndInitTable(schemaString: String, jsonData: String): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala index 0a0aaa8021996..46586c622db79 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.catalog.CatalogTableType -import org.apache.spark.sql.connector.catalog.{DelegatingCatalogExtension, Identifier, Table, TableCatalog, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, DelegatingCatalogExtension, Identifier, Table, TableCatalog, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -64,6 +64,15 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating } } + override def createTable( + ident: Identifier, + columns: Array[Column], + partitions: Array[Transform], + properties: java.util.Map[String, String]): Table = { + createTable(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties) + } + + // TODO: remove it when no tests calling this deprecated method. override def createTable( ident: Identifier, schema: StructType, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala index b262e405d4ef6..f7905daa20a5e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala @@ -974,7 +974,7 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase } test("continuous mode allows unspecified distribution and empty ordering") { - catalog.createTable(ident, schema, Array.empty, emptyProps) + catalog.createTable(ident, schema, Array.empty[Transform], emptyProps) withTempDir { checkpointDir => val inputData = ContinuousMemoryStream[(Long, String)] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 44b4166c07a6f..2cf4792b8c12f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -34,17 +34,17 @@ import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, InsertIntoStatement, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, OverwriteByExpression, OverwritePartitionsDynamic, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns import org.apache.spark.sql.connector.FakeV2Provider -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, SupportsDelete, Table, TableCapability, TableCatalog, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Column, ColumnDefaultValue, Identifier, SupportsDelete, Table, TableCapability, TableCatalog, V1Table} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME -import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.expressions.{LiteralValue, Transform} import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode} import org.apache.spark.sql.sources.SimpleScanSource -import org.apache.spark.sql.types.{BooleanType, CharType, DoubleType, IntegerType, LongType, MetadataBuilder, StringType, StructField, StructType} +import org.apache.spark.sql.types.{BooleanType, CharType, DoubleType, IntegerType, LongType, StringType, StructField, StructType, VarcharType} +import org.apache.spark.unsafe.types.UTF8String class PlanResolutionSuite extends AnalysisTest { import CatalystSqlParser._ @@ -54,21 +54,24 @@ class PlanResolutionSuite extends AnalysisTest { private val table: Table = { val t = mock(classOf[SupportsDelete]) - when(t.schema()).thenReturn(new StructType().add("i", "int").add("s", "string")) + when(t.columns()).thenReturn( + Array(Column.create("i", IntegerType), Column.create("s", StringType))) when(t.partitioning()).thenReturn(Array.empty[Transform]) t } private val table1: Table = { val t = mock(classOf[Table]) - when(t.schema()).thenReturn(new StructType().add("s", "string").add("i", "int")) + when(t.columns()).thenReturn( + Array(Column.create("s", StringType), Column.create("i", IntegerType))) when(t.partitioning()).thenReturn(Array.empty[Transform]) t } private val table2: Table = { val t = mock(classOf[Table]) - when(t.schema()).thenReturn(new StructType().add("i", "int").add("x", "string")) + when(t.columns()).thenReturn( + Array(Column.create("i", IntegerType), Column.create("x", StringType))) when(t.partitioning()).thenReturn(Array.empty[Transform]) t } @@ -76,53 +79,46 @@ class PlanResolutionSuite extends AnalysisTest { private val tableWithAcceptAnySchemaCapability: Table = { val t = mock(classOf[Table]) when(t.name()).thenReturn("v2TableWithAcceptAnySchemaCapability") - when(t.schema()).thenReturn(new StructType().add("i", "int")) + when(t.columns()).thenReturn(Array(Column.create("i", IntegerType))) when(t.capabilities()).thenReturn(Collections.singleton(TableCapability.ACCEPT_ANY_SCHEMA)) t } private val charVarcharTable: Table = { val t = mock(classOf[Table]) - when(t.schema()).thenReturn(new StructType().add("c1", "char(5)").add("c2", "varchar(5)")) + when(t.columns()).thenReturn( + Array(Column.create("c1", CharType(5)), Column.create("c2", VarcharType(5)))) when(t.partitioning()).thenReturn(Array.empty[Transform]) t } private val defaultValues: Table = { val t = mock(classOf[Table]) - when(t.schema()).thenReturn( - new StructType() - .add("i", BooleanType, true, - new MetadataBuilder() - .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "true") - .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "true").build()) - .add("s", IntegerType, true, - new MetadataBuilder() - .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "42") - .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "42").build())) + val default1 = new ColumnDefaultValue("true", LiteralValue(true, BooleanType)) + val default2 = new ColumnDefaultValue("42", LiteralValue(42, IntegerType)) + when(t.columns()).thenReturn(Array( + Column.create("i", BooleanType, true, null, default1, null), + Column.create("s", IntegerType, true, null, default2, null))) when(t.partitioning()).thenReturn(Array.empty[Transform]) t } private val defaultValues2: Table = { val t = mock(classOf[Table]) - when(t.schema()).thenReturn( - new StructType() - .add("i", StringType) - .add("e", StringType, true, - new MetadataBuilder() - .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'abc'") - .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "'abc'").build())) + val default = new ColumnDefaultValue( + "'abc'", LiteralValue(UTF8String.fromString("abc"), StringType)) + when(t.columns()).thenReturn(Array( + Column.create("i", StringType), + Column.create("e", StringType, true, null, default, null))) when(t.partitioning()).thenReturn(Array.empty[Transform]) t } private val tableWithColumnNamedDefault: Table = { val t = mock(classOf[Table]) - when(t.schema()).thenReturn( - new StructType() - .add("s", StringType) - .add("default", StringType)) + when(t.columns()).thenReturn(Array( + Column.create("s", StringType), + Column.create("default", StringType))) when(t.partitioning()).thenReturn(Array.empty[Transform]) t } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala index d0169bde40ff5..33e2fc46ccba4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala @@ -23,6 +23,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.QueryTest import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog} +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.functions.lit import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType @@ -51,7 +52,7 @@ class InMemoryTableMetricSuite testCatalog.createTable( Identifier.of(Array(), "table_name"), new StructType().add("i", "int"), - Array.empty, Collections.emptyMap[String, String]) + Array.empty[Transform], Collections.emptyMap[String, String]) func("testcat.table_name") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 2a441157f9da4..8f5996438e202 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange, V1Table} +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -38,6 +39,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap abstract class V2SessionCatalogBaseSuite extends SharedSparkSession with BeforeAndAfter { val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String] + val emptyTrans: Array[Transform] = Array.empty val schema: StructType = new StructType() .add("id", IntegerType) .add("data", StringType) @@ -95,13 +97,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(catalog.listTables(Array("ns")).isEmpty) - catalog.createTable(ident1, schema, Array.empty, emptyProps) + catalog.createTable(ident1, schema, emptyTrans, emptyProps) assert(catalog.listTables(Array("ns")).toSet == Set(ident1)) assert(catalog.listTables(Array("ns2")).isEmpty) - catalog.createTable(ident3, schema, Array.empty, emptyProps) - catalog.createTable(ident2, schema, Array.empty, emptyProps) + catalog.createTable(ident3, schema, emptyTrans, emptyProps) + catalog.createTable(ident2, schema, emptyTrans, emptyProps) assert(catalog.listTables(Array("ns")).toSet == Set(ident1, ident2)) assert(catalog.listTables(Array("ns2")).toSet == Set(ident3)) @@ -123,7 +125,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) assert(parsed == Seq("db", "test_table")) @@ -141,7 +143,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) - val table = catalog.createTable(testIdent, schema, Array.empty, properties) + val table = catalog.createTable(testIdent, schema, emptyTrans, properties) val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) assert(parsed == Seq("db", "test_table")) @@ -156,13 +158,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) .map(part => quoteIdentifier(part)).mkString(".") val exc = intercept[TableAlreadyExistsException] { - catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) } checkErrorTableAlreadyExists(exc, parsed) @@ -183,26 +185,26 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) // default location - val t1 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[V1Table] + val t1 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table] assert(t1.catalogTable.location === spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier)) catalog.dropTable(testIdent) // relative path properties.put(TableCatalog.PROP_LOCATION, "relative/path") - val t2 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[V1Table] + val t2 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table] assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path")) catalog.dropTable(testIdent) // absolute path without scheme properties.put(TableCatalog.PROP_LOCATION, "/absolute/path") - val t3 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[V1Table] + val t3 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table] assert(t3.catalogTable.location.toString === "file:///absolute/path") catalog.dropTable(testIdent) // absolute path with scheme properties.put(TableCatalog.PROP_LOCATION, "file:/absolute/path") - val t4 = catalog.createTable(testIdent, schema, Array.empty, properties).asInstanceOf[V1Table] + val t4 = catalog.createTable(testIdent, schema, emptyTrans, properties).asInstanceOf[V1Table] assert(t4.catalogTable.location.toString === "file:/absolute/path") catalog.dropTable(testIdent) } @@ -212,7 +214,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) - catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(catalog.tableExists(testIdent)) @@ -224,7 +226,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("loadTable") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) val loaded = catalog.loadTable(testIdent) assert(table.name == loaded.name) @@ -245,7 +247,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("invalidateTable") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) catalog.invalidateTable(testIdent) val loaded = catalog.loadTable(testIdent) @@ -266,7 +268,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add property") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(filterV2TableProperties(table.properties) == Map()) @@ -285,7 +287,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val properties = new util.HashMap[String, String]() properties.put("prop-1", "1") - val table = catalog.createTable(testIdent, schema, Array.empty, properties) + val table = catalog.createTable(testIdent, schema, emptyTrans, properties) assert(filterV2TableProperties(table.properties) == Map("prop-1" -> "1")) @@ -304,7 +306,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val properties = new util.HashMap[String, String]() properties.put("prop-1", "1") - val table = catalog.createTable(testIdent, schema, Array.empty, properties) + val table = catalog.createTable(testIdent, schema, emptyTrans, properties) assert(filterV2TableProperties(table.properties) == Map("prop-1" -> "1")) @@ -320,7 +322,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: remove missing property") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(filterV2TableProperties(table.properties) == Map()) @@ -336,7 +338,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add top-level column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -348,7 +350,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add required column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -361,7 +363,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add column with comment") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -378,7 +380,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) assert(table.schema == tableSchema) @@ -393,7 +395,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add column to primitive field fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -411,7 +413,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add field to missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -427,7 +429,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: update column data type") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -443,7 +445,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val originalSchema = new StructType() .add("id", IntegerType, nullable = false) .add("data", StringType) - val table = catalog.createTable(testIdent, originalSchema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, originalSchema, emptyTrans, emptyProps) assert(table.schema == originalSchema) @@ -457,7 +459,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: update missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -473,7 +475,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add comment") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -489,7 +491,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: replace comment") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -508,7 +510,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add comment to missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -524,7 +526,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: rename top-level column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -541,7 +543,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) assert(table.schema == tableSchema) @@ -560,7 +562,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) assert(table.schema == tableSchema) @@ -576,7 +578,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: rename missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -595,7 +597,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) assert(table.schema == tableSchema) @@ -612,7 +614,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: delete top-level column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -629,7 +631,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) assert(table.schema == tableSchema) @@ -645,7 +647,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: delete missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(table.schema == schema) @@ -667,7 +669,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) val tableSchema = schema.add("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, Array.empty, emptyProps) + val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) assert(table.schema == tableSchema) @@ -698,7 +700,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) // default location - val t1 = catalog.createTable(testIdent, schema, Array.empty, emptyProps).asInstanceOf[V1Table] + val t1 = catalog.createTable(testIdent, schema, emptyTrans, emptyProps).asInstanceOf[V1Table] assert(t1.catalogTable.location === spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier)) @@ -723,7 +725,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) - catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(catalog.tableExists(testIdent)) @@ -750,7 +752,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) assert(!catalog.tableExists(testIdentNew)) - catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(catalog.tableExists(testIdent)) catalog.renameTable(testIdent, testIdentNew) @@ -775,8 +777,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) assert(!catalog.tableExists(testIdentNew)) - catalog.createTable(testIdent, schema, Array.empty, emptyProps) - catalog.createTable(testIdentNew, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdentNew, schema, emptyTrans, emptyProps) assert(catalog.tableExists(testIdent)) assert(catalog.tableExists(testIdentNew)) @@ -795,7 +797,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) assert(!catalog.tableExists(testIdentNewOtherDb)) - catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) assert(catalog.tableExists(testIdent)) @@ -982,7 +984,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(catalog.namespaceExists(testNs) === false) val exc = intercept[NoSuchDatabaseException] { - catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) } assert(exc.getMessage.contains(testNs.quoted)) @@ -1016,7 +1018,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { val catalog = newCatalog() catalog.createNamespace(testNs, Map("property" -> "value").asJava) - catalog.createTable(testIdent, schema, Array.empty, emptyProps) + catalog.createTable(testIdent, schema, emptyTrans, emptyProps) val exc = intercept[AnalysisException] { catalog.dropNamespace(testNs, cascade = false) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index 13f2a865936f5..ea1e9a7e0486f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -869,7 +869,7 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter |SORTED BY (s1) |INTO 200 BUCKETS |STORED AS PARQUET - """.stripMargin + |""".stripMargin } else { """ |CREATE TABLE test1( @@ -880,14 +880,14 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter |CLUSTERED BY (v1) |SORTED BY (s1) |INTO 200 BUCKETS - """.stripMargin + |""".stripMargin } val insertString = """ |INSERT INTO test1 |SELECT * FROM VALUES(1,1,1) - """.stripMargin + |""".stripMargin val dropString = "DROP TABLE IF EXISTS test1"