diff --git a/docs/spark-connector/spark-catalog-iceberg.md b/docs/spark-connector/spark-catalog-iceberg.md index f5defef510a..70bede73cb8 100644 --- a/docs/spark-connector/spark-catalog-iceberg.md +++ b/docs/spark-connector/spark-catalog-iceberg.md @@ -6,6 +6,8 @@ license: "Copyright 2024 Datastrato Pvt Ltd. This software is licensed under the Apache License version 2." --- +The Gravitino Spark connector offers the capability to read and write Iceberg tables, with the metadata managed by the Gravitino server. To enable the use of the Iceberg catalog within the Spark connector, you must set the configuration `spark.sql.gravitino.enableIcebergSupport` to `true` and download Iceberg Spark runtime jar to Spark classpath. + ## Capabilities #### Support DML and DDL operations: diff --git a/docs/spark-connector/spark-connector.md b/docs/spark-connector/spark-connector.md index 246b566ce1b..8776e900558 100644 --- a/docs/spark-connector/spark-connector.md +++ b/docs/spark-connector/spark-connector.md @@ -27,11 +27,12 @@ The Gravitino Spark connector leverages the Spark DataSourceV2 interface to faci 1. [Build](../how-to-build.md) or download the Gravitino spark connector jar, and place it to the classpath of Spark. 2. Configure the Spark session to use the Gravitino spark connector. -| Property | Type | Default Value | Description | Required | Since Version | -|------------------------------|--------|---------------|-----------------------------------------------------------------------------------------------------|----------|---------------| -| spark.plugins | string | (none) | Gravitino spark plugin name, `com.datastrato.gravitino.spark.connector.plugin.GravitinoSparkPlugin` | Yes | 0.5.0 | -| spark.sql.gravitino.metalake | string | (none) | The metalake name that spark connector used to request to Gravitino. | Yes | 0.5.0 | -| spark.sql.gravitino.uri | string | (none) | The uri of Gravitino server address. | Yes | 0.5.0 | +| Property | Type | Default Value | Description | Required | Since Version | +|------------------------------------------|--------|---------------|-----------------------------------------------------------------------------------------------------|----------|---------------| +| spark.plugins | string | (none) | Gravitino spark plugin name, `com.datastrato.gravitino.spark.connector.plugin.GravitinoSparkPlugin` | Yes | 0.5.0 | +| spark.sql.gravitino.metalake | string | (none) | The metalake name that spark connector used to request to Gravitino. | Yes | 0.5.0 | +| spark.sql.gravitino.uri | string | (none) | The uri of Gravitino server address. | Yes | 0.5.0 | +| spark.sql.gravitino.enableIcebergSupport | string | `false` | Set to `true` to use Iceberg catalog. | No | 0.5.1 | ```shell ./bin/spark-sql -v \ diff --git a/spark-connector/spark-connector-runtime/build.gradle.kts b/spark-connector/spark-connector-runtime/build.gradle.kts index c9c1bbd9c1f..d60f4de402b 100644 --- a/spark-connector/spark-connector-runtime/build.gradle.kts +++ b/spark-connector/spark-connector-runtime/build.gradle.kts @@ -13,14 +13,11 @@ plugins { val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString() val sparkVersion: String = libs.versions.spark.get() val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".") -val icebergVersion: String = libs.versions.iceberg.get() val baseName = "${rootProject.name}-spark-connector-runtime-${sparkMajorVersion}_$scalaVersion" dependencies { implementation(project(":clients:client-java-runtime", configuration = "shadow")) implementation(project(":spark-connector:spark-connector")) - - implementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") } tasks.withType(ShadowJar::class.java) { diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/GravitinoSparkConfig.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/GravitinoSparkConfig.java index 9c5e8e66b9f..681ed95dc44 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/GravitinoSparkConfig.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/GravitinoSparkConfig.java @@ -10,6 +10,8 @@ public class GravitinoSparkConfig { private static final String GRAVITINO_PREFIX = "spark.sql.gravitino."; public static final String GRAVITINO_URI = GRAVITINO_PREFIX + "uri"; public static final String GRAVITINO_METALAKE = GRAVITINO_PREFIX + "metalake"; + public static final String GRAVITINO_ENABLE_ICEBERG_SUPPORT = + GRAVITINO_PREFIX + "enableIcebergSupport"; public static final String GRAVITINO_HIVE_METASTORE_URI = "metastore.uris"; public static final String SPARK_HIVE_METASTORE_URI = "hive.metastore.uris"; diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java index 3a80d7a6148..5a5e56daab2 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java @@ -13,12 +13,14 @@ import com.datastrato.gravitino.spark.connector.catalog.GravitinoCatalogManager; import com.datastrato.gravitino.spark.connector.hive.GravitinoHiveCatalog; import com.datastrato.gravitino.spark.connector.iceberg.GravitinoIcebergCatalog; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Locale; import java.util.Map; import org.apache.commons.lang3.StringUtils; -import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.plugin.DriverPlugin; @@ -32,11 +34,16 @@ * register Gravitino catalogs to Spark. */ public class GravitinoDriverPlugin implements DriverPlugin { + private static final Logger LOG = LoggerFactory.getLogger(GravitinoDriverPlugin.class); private GravitinoCatalogManager catalogManager; - private static final String[] GRAVITINO_DRIVER_EXTENSIONS = - new String[] {IcebergSparkSessionExtensions.class.getName()}; + private List gravitinoDriverExtensions = new ArrayList<>(); + private boolean enableIcebergSupport = false; + + @VisibleForTesting + static final String ICEBERG_SPARK_EXTENSIONS = + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"; @Override public Map init(SparkContext sc, PluginContext pluginContext) { @@ -52,7 +59,13 @@ public Map init(SparkContext sc, PluginContext pluginContext) { String.format( "%s:%s, should not be empty", GravitinoSparkConfig.GRAVITINO_METALAKE, metalake)); - catalogManager = GravitinoCatalogManager.create(gravitinoUri, metalake); + this.enableIcebergSupport = + conf.getBoolean(GravitinoSparkConfig.GRAVITINO_ENABLE_ICEBERG_SUPPORT, false); + if (enableIcebergSupport) { + gravitinoDriverExtensions.add(ICEBERG_SPARK_EXTENSIONS); + } + + this.catalogManager = GravitinoCatalogManager.create(gravitinoUri, metalake); catalogManager.loadRelationalCatalogs(); registerGravitinoCatalogs(conf, catalogManager.getCatalogs()); registerSqlExtensions(conf); @@ -75,6 +88,10 @@ private void registerGravitinoCatalogs( String catalogName = entry.getKey(); Catalog gravitinoCatalog = entry.getValue(); String provider = gravitinoCatalog.provider(); + if ("lakehouse-iceberg".equals(provider.toLowerCase(Locale.ROOT)) + && enableIcebergSupport == false) { + return; + } try { registerCatalog(sparkConf, catalogName, provider); } catch (Exception e) { @@ -111,19 +128,20 @@ private void registerCatalog(SparkConf sparkConf, String catalogName, String pro } private void registerSqlExtensions(SparkConf conf) { - String gravitinoDriverExtensions = String.join(COMMA, GRAVITINO_DRIVER_EXTENSIONS); + String extensionString = String.join(COMMA, gravitinoDriverExtensions); if (conf.contains(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key())) { String sparkSessionExtensions = conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key()); if (StringUtils.isNotBlank(sparkSessionExtensions)) { conf.set( StaticSQLConf.SPARK_SESSION_EXTENSIONS().key(), removeDuplicateSparkExtensions( - GRAVITINO_DRIVER_EXTENSIONS, sparkSessionExtensions.split(COMMA))); + gravitinoDriverExtensions.toArray(new String[0]), + sparkSessionExtensions.split(COMMA))); } else { - conf.set(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key(), gravitinoDriverExtensions); + conf.set(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key(), extensionString); } } else { - conf.set(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key(), gravitinoDriverExtensions); + conf.set(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key(), extensionString); } } } diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkEnvIT.java b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkEnvIT.java index b285cb941b5..2e710d01b1d 100644 --- a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkEnvIT.java +++ b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkEnvIT.java @@ -183,6 +183,7 @@ private void initSparkEnv() { .config("spark.plugins", GravitinoSparkPlugin.class.getName()) .config(GravitinoSparkConfig.GRAVITINO_URI, gravitinoUri) .config(GravitinoSparkConfig.GRAVITINO_METALAKE, metalakeName) + .config(GravitinoSparkConfig.GRAVITINO_ENABLE_ICEBERG_SUPPORT, "true") .config("hive.exec.dynamic.partition.mode", "nonstrict") .config("spark.sql.warehouse.dir", warehouse) .config("spark.sql.session.timeZone", TIME_ZONE_UTC) diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/plugin/TestGravitinoDriverPlugin.java b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/plugin/TestGravitinoDriverPlugin.java new file mode 100644 index 00000000000..624e16ebef2 --- /dev/null +++ b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/plugin/TestGravitinoDriverPlugin.java @@ -0,0 +1,20 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.spark.connector.plugin; + +import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestGravitinoDriverPlugin { + + @Test + void testIcebergExtensionName() { + Assertions.assertEquals( + IcebergSparkSessionExtensions.class.getName(), + GravitinoDriverPlugin.ICEBERG_SPARK_EXTENSIONS); + } +}