Skip to content

Commit

Permalink
[#3396] feat(spark-connector): remove iceberg runtime from spark-conn…
Browse files Browse the repository at this point in the history
…ector jar (#3539)

### What changes were proposed in this pull request?
remove iceberg runtime from spark-connector jar

### Why are the changes needed?

Fix: #3396 

### Does this PR introduce _any_ user-facing change?
yes add document

### How was this patch tested?
set `enableIcebergSupport` == true, could use Iceberg catalog with extra
iceberg-runtime jars. had tested iceberg 1.4,1.3,1.5 jars
not set `enableIcebergSupport`, could only use hive catalog.
  • Loading branch information
FANNG1 authored and web-flow committed May 24, 2024
1 parent 4c7c86d commit c71a13e
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 16 deletions.
2 changes: 2 additions & 0 deletions docs/spark-connector/spark-catalog-iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ license: "Copyright 2024 Datastrato Pvt Ltd.
This software is licensed under the Apache License version 2."
---

The Gravitino Spark connector offers the capability to read and write Iceberg tables, with the metadata managed by the Gravitino server. To enable the use of the Iceberg catalog within the Spark connector, you must set the configuration `spark.sql.gravitino.enableIcebergSupport` to `true` and download Iceberg Spark runtime jar to Spark classpath.

## Capabilities

#### Support DML and DDL operations:
Expand Down
11 changes: 6 additions & 5 deletions docs/spark-connector/spark-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ The Gravitino Spark connector leverages the Spark DataSourceV2 interface to faci
1. [Build](../how-to-build.md) or download the Gravitino spark connector jar, and place it to the classpath of Spark.
2. Configure the Spark session to use the Gravitino spark connector.

| Property | Type | Default Value | Description | Required | Since Version |
|------------------------------|--------|---------------|-----------------------------------------------------------------------------------------------------|----------|---------------|
| spark.plugins | string | (none) | Gravitino spark plugin name, `com.datastrato.gravitino.spark.connector.plugin.GravitinoSparkPlugin` | Yes | 0.5.0 |
| spark.sql.gravitino.metalake | string | (none) | The metalake name that spark connector used to request to Gravitino. | Yes | 0.5.0 |
| spark.sql.gravitino.uri | string | (none) | The uri of Gravitino server address. | Yes | 0.5.0 |
| Property | Type | Default Value | Description | Required | Since Version |
|------------------------------------------|--------|---------------|-----------------------------------------------------------------------------------------------------|----------|---------------|
| spark.plugins | string | (none) | Gravitino spark plugin name, `com.datastrato.gravitino.spark.connector.plugin.GravitinoSparkPlugin` | Yes | 0.5.0 |
| spark.sql.gravitino.metalake | string | (none) | The metalake name that spark connector used to request to Gravitino. | Yes | 0.5.0 |
| spark.sql.gravitino.uri | string | (none) | The uri of Gravitino server address. | Yes | 0.5.0 |
| spark.sql.gravitino.enableIcebergSupport | string | `false` | Set to `true` to use Iceberg catalog. | No | 0.5.1 |

```shell
./bin/spark-sql -v \
Expand Down
3 changes: 0 additions & 3 deletions spark-connector/spark-connector-runtime/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,11 @@ plugins {
val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString()
val sparkVersion: String = libs.versions.spark.get()
val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".")
val icebergVersion: String = libs.versions.iceberg.get()
val baseName = "${rootProject.name}-spark-connector-runtime-${sparkMajorVersion}_$scalaVersion"

dependencies {
implementation(project(":clients:client-java-runtime", configuration = "shadow"))
implementation(project(":spark-connector:spark-connector"))

implementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
}

tasks.withType<ShadowJar>(ShadowJar::class.java) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public class GravitinoSparkConfig {
private static final String GRAVITINO_PREFIX = "spark.sql.gravitino.";
public static final String GRAVITINO_URI = GRAVITINO_PREFIX + "uri";
public static final String GRAVITINO_METALAKE = GRAVITINO_PREFIX + "metalake";
public static final String GRAVITINO_ENABLE_ICEBERG_SUPPORT =
GRAVITINO_PREFIX + "enableIcebergSupport";
public static final String GRAVITINO_HIVE_METASTORE_URI = "metastore.uris";
public static final String SPARK_HIVE_METASTORE_URI = "hive.metastore.uris";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import com.datastrato.gravitino.spark.connector.catalog.GravitinoCatalogManager;
import com.datastrato.gravitino.spark.connector.hive.GravitinoHiveCatalog;
import com.datastrato.gravitino.spark.connector.iceberg.GravitinoIcebergCatalog;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.plugin.DriverPlugin;
Expand All @@ -32,11 +34,16 @@
* register Gravitino catalogs to Spark.
*/
public class GravitinoDriverPlugin implements DriverPlugin {

private static final Logger LOG = LoggerFactory.getLogger(GravitinoDriverPlugin.class);

private GravitinoCatalogManager catalogManager;
private static final String[] GRAVITINO_DRIVER_EXTENSIONS =
new String[] {IcebergSparkSessionExtensions.class.getName()};
private List<String> gravitinoDriverExtensions = new ArrayList<>();
private boolean enableIcebergSupport = false;

@VisibleForTesting
static final String ICEBERG_SPARK_EXTENSIONS =
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions";

@Override
public Map<String, String> init(SparkContext sc, PluginContext pluginContext) {
Expand All @@ -52,7 +59,13 @@ public Map<String, String> init(SparkContext sc, PluginContext pluginContext) {
String.format(
"%s:%s, should not be empty", GravitinoSparkConfig.GRAVITINO_METALAKE, metalake));

catalogManager = GravitinoCatalogManager.create(gravitinoUri, metalake);
this.enableIcebergSupport =
conf.getBoolean(GravitinoSparkConfig.GRAVITINO_ENABLE_ICEBERG_SUPPORT, false);
if (enableIcebergSupport) {
gravitinoDriverExtensions.add(ICEBERG_SPARK_EXTENSIONS);
}

this.catalogManager = GravitinoCatalogManager.create(gravitinoUri, metalake);
catalogManager.loadRelationalCatalogs();
registerGravitinoCatalogs(conf, catalogManager.getCatalogs());
registerSqlExtensions(conf);
Expand All @@ -75,6 +88,10 @@ private void registerGravitinoCatalogs(
String catalogName = entry.getKey();
Catalog gravitinoCatalog = entry.getValue();
String provider = gravitinoCatalog.provider();
if ("lakehouse-iceberg".equals(provider.toLowerCase(Locale.ROOT))
&& enableIcebergSupport == false) {
return;
}
try {
registerCatalog(sparkConf, catalogName, provider);
} catch (Exception e) {
Expand Down Expand Up @@ -111,19 +128,20 @@ private void registerCatalog(SparkConf sparkConf, String catalogName, String pro
}

private void registerSqlExtensions(SparkConf conf) {
String gravitinoDriverExtensions = String.join(COMMA, GRAVITINO_DRIVER_EXTENSIONS);
String extensionString = String.join(COMMA, gravitinoDriverExtensions);
if (conf.contains(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key())) {
String sparkSessionExtensions = conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key());
if (StringUtils.isNotBlank(sparkSessionExtensions)) {
conf.set(
StaticSQLConf.SPARK_SESSION_EXTENSIONS().key(),
removeDuplicateSparkExtensions(
GRAVITINO_DRIVER_EXTENSIONS, sparkSessionExtensions.split(COMMA)));
gravitinoDriverExtensions.toArray(new String[0]),
sparkSessionExtensions.split(COMMA)));
} else {
conf.set(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key(), gravitinoDriverExtensions);
conf.set(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key(), extensionString);
}
} else {
conf.set(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key(), gravitinoDriverExtensions);
conf.set(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key(), extensionString);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ private void initSparkEnv() {
.config("spark.plugins", GravitinoSparkPlugin.class.getName())
.config(GravitinoSparkConfig.GRAVITINO_URI, gravitinoUri)
.config(GravitinoSparkConfig.GRAVITINO_METALAKE, metalakeName)
.config(GravitinoSparkConfig.GRAVITINO_ENABLE_ICEBERG_SUPPORT, "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.warehouse.dir", warehouse)
.config("spark.sql.session.timeZone", TIME_ZONE_UTC)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.spark.connector.plugin;

import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestGravitinoDriverPlugin {

@Test
void testIcebergExtensionName() {
Assertions.assertEquals(
IcebergSparkSessionExtensions.class.getName(),
GravitinoDriverPlugin.ICEBERG_SPARK_EXTENSIONS);
}
}

0 comments on commit c71a13e

Please sign in to comment.