Skip to content

Commit

Permalink
Spark 3.5: Support preserving schema nullability in CTAS and RTAS (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongyujiang authored Apr 12, 2024
1 parent 1e66657 commit 81b3310
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/docs/spark-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Both catalogs are configured using properties nested under the catalog name. Com
| spark.sql.catalog._catalog-name_.cache.expiration-interval-ms | `30000` (30 seconds) | Duration after which cached catalog entries are expired; Only effective if `cache-enabled` is `true`. `-1` disables cache expiration and `0` disables caching entirely, irrespective of `cache-enabled`. Default is `30000` (30 seconds) |
| spark.sql.catalog._catalog-name_.table-default._propertyKey_ | | Default Iceberg table property value for property key _propertyKey_, which will be set on tables created by this catalog if not overridden |
| spark.sql.catalog._catalog-name_.table-override._propertyKey_ | | Enforced Iceberg table property value for property key _propertyKey_, which cannot be overridden by user |
| spark.sql.catalog._catalog-name_.use-nullable-query-schema | `true` or `false` | Whether to preserve fields' nullability when creating the table using CTAS and RTAS. If set to `true`, all fields will be marked as nullable. If set to `false`, fields' nullability will be preserved. The default value is `true`. Available in Spark 3.5 and above. |

Additional properties can be found in common [catalog configuration](configuration.md#catalog-properties).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,25 @@
import org.apache.iceberg.spark.procedures.SparkProcedures;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.spark.source.HasIcebergCatalog;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

abstract class BaseCatalog
implements StagingTableCatalog,
ProcedureCatalog,
SupportsNamespaces,
HasIcebergCatalog,
SupportsFunctions {
private static final String USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS = "use-nullable-query-schema";
private static final boolean USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT = true;

private boolean useNullableQuerySchema = USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT;

@Override
public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException {
Expand Down Expand Up @@ -66,6 +72,20 @@ public boolean isExistingNamespace(String[] namespace) {
return namespaceExists(namespace);
}

@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.useNullableQuerySchema =
PropertyUtil.propertyAsBoolean(
options,
USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS,
USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT);
}

@Override
public boolean useNullableQuerySchema() {
return useNullableQuerySchema;
}

private static boolean isSystemNamespace(String[] namespace) {
return namespace.length == 1 && namespace[0].equalsIgnoreCase("system");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,8 @@ public void renameView(Identifier fromIdentifier, Identifier toIdentifier)

@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {
super.initialize(name, options);

this.cacheEnabled =
PropertyUtil.propertyAsBoolean(
options, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ public void renameTable(Identifier from, Identifier to)

@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {
super.initialize(name, options);

if (options.containsKey(CatalogUtil.ICEBERG_CATALOG_TYPE)
&& options
.get(CatalogUtil.ICEBERG_CATALOG_TYPE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.ThreadLocalRandom;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.Table;
Expand All @@ -33,6 +38,47 @@
import org.junit.jupiter.api.TestTemplate;

public class TestSparkCatalogOperations extends CatalogTestBase {
private static boolean useNullableQuerySchema = ThreadLocalRandom.current().nextBoolean();

@Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
protected static Object[][] parameters() {
return new Object[][] {
{
SparkCatalogConfig.HIVE.catalogName(),
SparkCatalogConfig.HIVE.implementation(),
ImmutableMap.of(
"type", "hive",
"default-namespace", "default",
"use-nullable-query-schema", Boolean.toString(useNullableQuerySchema))
},
{
SparkCatalogConfig.HADOOP.catalogName(),
SparkCatalogConfig.HADOOP.implementation(),
ImmutableMap.of(
"type",
"hadoop",
"cache-enabled",
"false",
"use-nullable-query-schema",
Boolean.toString(useNullableQuerySchema))
},
{
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
ImmutableMap.of(
"type",
"hive",
"default-namespace",
"default",
"parquet-enabled",
"true",
"cache-enabled",
"false", // Spark will delete tables using v1, leaving the cache out of sync
"use-nullable-query-schema",
Boolean.toString(useNullableQuerySchema)),
}
};
}

@BeforeEach
public void createTable() {
Expand Down Expand Up @@ -86,4 +132,60 @@ public void testInvalidateTable() {
sql("REFRESH TABLE %s", tableName);
sql("SELECT count(1) FROM %s", tableName);
}

@TestTemplate
public void testCTASUseNullableQuerySchema() {
sql("INSERT INTO %s VALUES(1, 'abc'), (2, null)", tableName);

String ctasTableName = tableName("ctas_table");

sql("CREATE TABLE %s USING iceberg AS SELECT * FROM %s", ctasTableName, tableName);

org.apache.iceberg.Table ctasTable =
validationCatalog.loadTable(TableIdentifier.parse("default.ctas_table"));

Schema expectedSchema =
new Schema(
useNullableQuerySchema
? Types.NestedField.optional(1, "id", Types.LongType.get())
: Types.NestedField.required(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()));

assertThat(ctasTable.schema().asStruct())
.as("Should have expected schema")
.isEqualTo(expectedSchema.asStruct());

sql("DROP TABLE IF EXISTS %s", ctasTableName);
}

@TestTemplate
public void testRTASUseNullableQuerySchema() {
sql("INSERT INTO %s VALUES(1, 'abc'), (2, null)", tableName);

String rtasTableName = tableName("rtas_table");
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", rtasTableName);

sql("REPLACE TABLE %s USING iceberg AS SELECT * FROM %s", rtasTableName, tableName);

org.apache.iceberg.Table rtasTable =
validationCatalog.loadTable(TableIdentifier.parse("default.rtas_table"));

Schema expectedSchema =
new Schema(
useNullableQuerySchema
? Types.NestedField.optional(1, "id", Types.LongType.get())
: Types.NestedField.required(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()));

assertThat(rtasTable.schema().asStruct())
.as("Should have expected schema")
.isEqualTo(expectedSchema.asStruct());

assertEquals(
"Should have rows matching the source table",
sql("SELECT * FROM %s ORDER BY id", tableName),
sql("SELECT * FROM %s ORDER BY id", rtasTableName));

sql("DROP TABLE IF EXISTS %s", rtasTableName);
}
}

0 comments on commit 81b3310

Please sign in to comment.