Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42398][SQL] Refine default column value DS v2 interface #40049

Closed
wants to merge 15 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.{caseSensitiveResolution, Analyzer
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, InMemoryCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -67,16 +68,17 @@ class ProtoToParsedPlanTestSuite extends SparkFunSuite with SharedSparkSession {

protected val inputFilePath: Path = baseResourcePath.resolve("queries")
protected val goldenFilePath: Path = baseResourcePath.resolve("explain-results")
private val emptyProps: util.Map[String, String] = util.Collections.emptyMap()

private val analyzer = {
val inMemoryCatalog = new InMemoryCatalog
inMemoryCatalog.initialize("primary", CaseInsensitiveStringMap.empty())
inMemoryCatalog.createNamespace(Array("tempdb"), util.Collections.emptyMap())
inMemoryCatalog.createNamespace(Array("tempdb"), emptyProps)
inMemoryCatalog.createTable(
Identifier.of(Array("tempdb"), "myTable"),
new StructType().add("id", "long"),
Array.empty,
util.Collections.emptyMap())
Array.empty[Transform],
emptyProps)

val catalogManager = new CatalogManager(
inMemoryCatalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.logging.log4j.Level
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -118,7 +119,7 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte
// Drop non empty namespace without cascade
catalog.createNamespace(Array("foo"), commentMap.asJava)
assert(catalog.namespaceExists(Array("foo")) === true)
catalog.createTable(ident1, schema, Array.empty, emptyProps)
catalog.createTable(ident1, schema, Array.empty[Transform], emptyProps)
if (supportsDropSchemaRestrict) {
intercept[NonEmptyNamespaceException] {
catalog.dropNamespace(Array("foo"), cascade = false)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import java.util.Map;
import javax.annotation.Nullable;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.internal.connector.ColumnImpl;
import org.apache.spark.sql.types.DataType;

/**
* An interface representing a column of a {@link Table}. It defines basic properties of a column,
* such as name and data type, as well as some advanced ones like default column value.
* <p>
* Data Sources do not need to implement it. They should consume it in APIs like
* {@link TableCatalog#createTable(Identifier, Column[], Transform[], Map)}, and report it in
* {@link Table#columns()} by calling the static {@code create} functions of this interface to
* create it.
*/
@Evolving
public interface Column {
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved

static Column create(String name, DataType dataType) {
return create(name, dataType, true);
}

static Column create(String name, DataType dataType, boolean nullable) {
return create(name, dataType, nullable, null, null, null);
}

static Column create(
String name,
DataType dataType,
boolean nullable,
String comment,
ColumnDefaultValue defaultValue,
String metadataInJSON) {
return new ColumnImpl(name, dataType, nullable, comment, defaultValue, metadataInJSON);
}

/**
* Returns the name of this table column.
*/
String name();

/**
* Returns the data type of this table column.
*/
DataType dataType();

/**
* Returns true if this column may produce null values.
*/
boolean nullable();

/**
* Returns the comment of this table column. Null means no comment.
*/
@Nullable
String comment();

/**
* Returns the default value of this table column. Null means no default value.
*/
@Nullable
ColumnDefaultValue defaultValue();

/**
* Returns the column metadata in JSON format.
*/
@Nullable
String metadataInJSON();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import java.util.Objects;
import javax.annotation.Nonnull;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.Literal;

/**
* A class representing the default value of a column. It contains both the SQL string and literal
* value of the user-specified default value expression. The SQL string should be re-evaluated for
* each table writing command, which may produce different values if the default value expression is
* something like {@code CURRENT_DATE()}. The literal value is used to back-fill existing data if
* new columns with default value are added. Note: the back-fill can be lazy. The data sources can
* remember the column default value and let the reader fill the column value when reading existing
* data that do not have these new columns.
*/
@Evolving
public class ColumnDefaultValue {
private String sql;
private Literal<?> value;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming value is confusing. Shall we rename it as initialValue? Or change sql and value as currentDefault and existingDefault.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A default value has two parts: the SQL string and the evaluated literal value. I don't think current default and exist default is easier to understand for data source developers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also read the classdoc? If you still think the name is confusing, let's figure out a better one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I have read the classdoc before commenting...I don't have a better suggestion. Let's enhance the doc later

Copy link
Contributor

@dtenedor dtenedor Feb 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data source developers only have to think about the existence default value. For any column where the corresponding field is not present in storage, the data source is responsible for filling this in instead of NULL.

On the other hand, the current default value is for DML only. The analyzer inserts this expression for any explicit reference to DEFAULT, or for a small subset of implicit cases.

For these fields we could clarify with comments, e.g.

  // This is the original string contents of the SQL expression specified at the
  // time the column was created in a CREATE TABLE, REPLACE TABLE, or ALTER TABLE
  // ADD COLUMN command. For example, for "CREATE TABLE t (col INT DEFAULT 42)",
  // this field is equal to the string literal "42" (without quotation marks).
  private String sql;
  // This is the literal value corresponding to the above SQL string. For the above
  // example, this would be a literal integer with a value of 42.
  private Literal<?> value;


public ColumnDefaultValue(String sql, Literal<?> value) {
this.sql = sql;
this.value = value;
}

/**
* Returns the SQL string (Spark SQL dialect) of the default value expression. This is the
* original string contents of the SQL expression specified at the time the column was created in
* a CREATE TABLE, REPLACE TABLE, or ADD COLUMN command. For example, for
* "CREATE TABLE t (col INT DEFAULT 40 + 2)", this returns the string literal "40 + 2" (without
* quotation marks).
*/
@Nonnull
public String getSql() {
return sql;
}

/**
* Returns the default value literal. This is the literal value corresponding to
* {@link #getSql()}. For the example in the doc of {@link #getSql()}, this returns a literal
* integer with a value of 42.
*/
@Nonnull
public Literal<?> getValue() {
return value;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ColumnDefaultValue)) return false;
ColumnDefaultValue that = (ColumnDefaultValue) o;
return sql.equals(that.sql) && value.equals(that.value);
}

@Override
public int hashCode() {
return Objects.hash(sql, value);
}

@Override
public String toString() {
return "ColumnDefaultValue{sql='" + sql + "\', value=" + value + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@
@Evolving
public interface StagingTableCatalog extends TableCatalog {

/**
* Stage the creation of a table, preparing it to be committed into the metastore.
* <p>
* This is deprecated. Please override
* {@link #stageCreate(Identifier, Column[], Transform[], Map)} instead.
*/
@Deprecated
StagedTable stageCreate(
Identifier ident,
StructType schema,
Transform[] partitions,
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException;

/**
* Stage the creation of a table, preparing it to be committed into the metastore.
* <p>
Expand All @@ -64,19 +77,34 @@ public interface StagingTableCatalog extends TableCatalog {
* committed, an exception should be thrown by {@link StagedTable#commitStagedChanges()}.
*
* @param ident a table identifier
* @param schema the schema of the new table, as a struct type
* @param columns the column of the new table
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
* @throws TableAlreadyExistsException If a table or view already exists for the identifier
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
*/
StagedTable stageCreate(
default StagedTable stageCreate(
Identifier ident,
Column[] columns,
Transform[] partitions,
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException {
return stageCreate(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
}

/**
* Stage the replacement of a table, preparing it to be committed into the metastore when the
* returned table's {@link StagedTable#commitStagedChanges()} is called.
* <p>
* This is deprecated, please override
* {@link #stageReplace(Identifier, StructType, Transform[], Map)} instead.
*/
StagedTable stageReplace(
Identifier ident,
StructType schema,
Transform[] partitions,
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException;
Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException;

/**
* Stage the replacement of a table, preparing it to be committed into the metastore when the
Expand All @@ -97,19 +125,35 @@ StagedTable stageCreate(
* operation.
*
* @param ident a table identifier
* @param schema the schema of the new table, as a struct type
* @param columns the columns of the new table
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
* @throws NoSuchTableException If the table does not exist
*/
StagedTable stageReplace(
default StagedTable stageReplace(
Identifier ident,
Column[] columns,
Transform[] partitions,
Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException {
return stageReplace(
ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
}

/**
* Stage the creation or replacement of a table, preparing it to be committed into the metastore
* when the returned table's {@link StagedTable#commitStagedChanges()} is called.
* <p>
* This is deprecated, please override
* {@link #stageCreateOrReplace(Identifier, Column[], Transform[], Map)} instead.
*/
StagedTable stageCreateOrReplace(
Identifier ident,
StructType schema,
Transform[] partitions,
Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException;
Map<String, String> properties) throws NoSuchNamespaceException;

/**
* Stage the creation or replacement of a table, preparing it to be committed into the metastore
Expand All @@ -129,16 +173,19 @@ StagedTable stageReplace(
* the staged changes are committed but the table doesn't exist at commit time.
*
* @param ident a table identifier
* @param schema the schema of the new table, as a struct type
* @param columns the columns of the new table
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
*/
StagedTable stageCreateOrReplace(
default StagedTable stageCreateOrReplace(
Identifier ident,
StructType schema,
Column[] columns,
Transform[] partitions,
Map<String, String> properties) throws NoSuchNamespaceException;
Map<String, String> properties) throws NoSuchNamespaceException {
return stageCreateOrReplace(
ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,20 @@ public interface Table {
/**
* Returns the schema of this table. If the table is not readable and doesn't have a schema, an
* empty schema can be returned here.
* <p>
* This is deprecated. Please override {@link #columns} instead.
*/
@Deprecated
StructType schema();

/**
* Returns the columns of this table. If the table is not readable and doesn't have a schema, an
* empty array can be returned here.
*/
default Column[] columns() {
return CatalogV2Util.structTypeToV2Columns(schema());
}

/**
* Returns the physical partitioning of this table.
*/
Expand Down
Loading