From ad1b0b19500685c3d25c9543403f4b76effe21e0 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Tue, 14 Mar 2017 15:13:03 -0400 Subject: [PATCH 1/4] open up catalog impl --- .../src/main/scala/org/apache/spark/sql/SparkSession.scala | 1 + .../sql/execution/datasources/DataSourceStrategy.scala | 6 +++--- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 1 - .../scala/org/apache/spark/sql/internal/SharedState.scala | 1 + 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index afc1827e7eece..b9ec07153b82c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -967,6 +967,7 @@ object SparkSession { conf.get(CATALOG_IMPLEMENTATION) match { case "hive" => HIVE_SESSION_STATE_CLASS_NAME case "in-memory" => classOf[SessionState].getCanonicalName + case name => name } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index f694a0d6d724b..bc5734f36891c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -215,8 +215,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] val table = r.tableMeta val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) val cache = sparkSession.sessionState.catalog.tableRelationCache - val withHiveSupport = - sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive" + val inMemory = + sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "in-memory" val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() { override def call(): LogicalPlan = { @@ -232,7 +232,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] className = table.provider.get, options = table.storage.properties ++ pathOption, // TODO: improve `InMemoryCatalog` and remove this limitation. - catalogTable = if (withHiveSupport) Some(table) else None) + catalogTable = if (inMemory) None else Some(table)) LogicalRelation( dataSource.resolveRelation(checkFilesExist = false), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 71ac6645f3daa..e42bf47b63f3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1031,7 +1031,6 @@ object StaticSQLConf { val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation") .internal() .stringConf - .checkValues(Set("hive", "in-memory")) .createWithDefault("in-memory") val GLOBAL_TEMP_DATABASE = buildStaticConf("spark.sql.globalTempDatabase") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index bce84de45c3d7..bfe4825307708 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -150,6 +150,7 @@ object SharedState { conf.get(CATALOG_IMPLEMENTATION) match { case "hive" => HIVE_EXTERNAL_CATALOG_CLASS_NAME case "in-memory" => classOf[InMemoryCatalog].getCanonicalName + case name => name } } From 73274a0b6834ccc63bd12b2cac99a7dc09f204c3 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Tue, 14 Mar 2017 17:05:42 -0400 Subject: [PATCH 2/4] public session state --- .../main/scala/org/apache/spark/sql/internal/SessionState.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 69085605113ea..9668106a18f24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.util.ExecutionListenerManager /** * A class that holds all session-specific state in a given [[SparkSession]]. */ -private[sql] class SessionState(sparkSession: SparkSession) { +class SessionState(sparkSession: SparkSession) { // Note: These are all lazy vals because they depend on each other (e.g. conf) and we // want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs. From 4d77fad6734c2acdb0eeb79872bcd64f75b1c834 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Wed, 15 Mar 2017 12:22:26 -0400 Subject: [PATCH 3/4] more configs --- .../src/main/scala/org/apache/spark/sql/SparkSession.scala | 7 ++++--- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 5 +++++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index b9ec07153b82c..bc4cf92584b94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -40,10 +40,10 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState} -import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, SESSION_STATE_IMPLEMENTATION} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.streaming._ -import org.apache.spark.sql.types.{DataType, LongType, StructType} +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.ExecutionListenerManager import org.apache.spark.util.Utils @@ -805,6 +805,7 @@ object SparkSession { */ def enableHiveSupport(): Builder = synchronized { if (hiveClassesArePresent) { + config(SESSION_STATE_IMPLEMENTATION.key, "hive") config(CATALOG_IMPLEMENTATION.key, "hive") } else { throw new IllegalArgumentException( @@ -964,7 +965,7 @@ object SparkSession { private val HIVE_SESSION_STATE_CLASS_NAME = "org.apache.spark.sql.hive.HiveSessionState" private def sessionStateClassName(conf: SparkConf): String = { - conf.get(CATALOG_IMPLEMENTATION) match { + conf.get(SESSION_STATE_IMPLEMENTATION) match { case "hive" => HIVE_SESSION_STATE_CLASS_NAME case "in-memory" => classOf[SessionState].getCanonicalName case name => name diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e42bf47b63f3d..fe54de3de1bf2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1033,6 +1033,11 @@ object StaticSQLConf { .stringConf .createWithDefault("in-memory") + val SESSION_STATE_IMPLEMENTATION = buildStaticConf("spark.sql.sessionStateImplementation") + .internal() + .stringConf + .createWithDefault("in-memory") + val GLOBAL_TEMP_DATABASE = buildStaticConf("spark.sql.globalTempDatabase") .internal() .stringConf From dd0817460c52fb8b6988832b7d9041d2200997b6 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Wed, 15 Mar 2017 13:43:50 -0400 Subject: [PATCH 4/4] better defaults --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fe54de3de1bf2..19e68c1e18b8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1036,7 +1036,7 @@ object StaticSQLConf { val SESSION_STATE_IMPLEMENTATION = buildStaticConf("spark.sql.sessionStateImplementation") .internal() .stringConf - .createWithDefault("in-memory") + .createWithDefault(CATALOG_IMPLEMENTATION.defaultValueString) val GLOBAL_TEMP_DATABASE = buildStaticConf("spark.sql.globalTempDatabase") .internal()