diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index afc1827e7eece..bc4cf92584b94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -40,10 +40,10 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState} -import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, SESSION_STATE_IMPLEMENTATION} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.streaming._ -import org.apache.spark.sql.types.{DataType, LongType, StructType} +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.ExecutionListenerManager import org.apache.spark.util.Utils @@ -805,6 +805,7 @@ object SparkSession { */ def enableHiveSupport(): Builder = synchronized { if (hiveClassesArePresent) { + config(SESSION_STATE_IMPLEMENTATION.key, "hive") config(CATALOG_IMPLEMENTATION.key, "hive") } else { throw new IllegalArgumentException( @@ -964,9 +965,10 @@ object SparkSession { private val HIVE_SESSION_STATE_CLASS_NAME = "org.apache.spark.sql.hive.HiveSessionState" private def sessionStateClassName(conf: SparkConf): String = { - conf.get(CATALOG_IMPLEMENTATION) match { + conf.get(SESSION_STATE_IMPLEMENTATION) match { case "hive" => HIVE_SESSION_STATE_CLASS_NAME case "in-memory" => classOf[SessionState].getCanonicalName + case name => name } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index f694a0d6d724b..bc5734f36891c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -215,8 +215,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] val table = r.tableMeta val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) val cache = sparkSession.sessionState.catalog.tableRelationCache - val withHiveSupport = - sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive" + val inMemory = + sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "in-memory" val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() { override def call(): LogicalPlan = { @@ -232,7 +232,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] className = table.provider.get, options = table.storage.properties ++ pathOption, // TODO: improve `InMemoryCatalog` and remove this limitation. - catalogTable = if (withHiveSupport) Some(table) else None) + catalogTable = if (inMemory) None else Some(table)) LogicalRelation( dataSource.resolveRelation(checkFilesExist = false), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 71ac6645f3daa..19e68c1e18b8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1031,9 +1031,13 @@ object StaticSQLConf { val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation") .internal() .stringConf - .checkValues(Set("hive", "in-memory")) .createWithDefault("in-memory") + val SESSION_STATE_IMPLEMENTATION = buildStaticConf("spark.sql.sessionStateImplementation") + .internal() + .stringConf + .createWithDefault(CATALOG_IMPLEMENTATION.defaultValueString) + val GLOBAL_TEMP_DATABASE = buildStaticConf("spark.sql.globalTempDatabase") .internal() .stringConf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 69085605113ea..9668106a18f24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.util.ExecutionListenerManager /** * A class that holds all session-specific state in a given [[SparkSession]]. */ -private[sql] class SessionState(sparkSession: SparkSession) { +class SessionState(sparkSession: SparkSession) { // Note: These are all lazy vals because they depend on each other (e.g. conf) and we // want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index bce84de45c3d7..bfe4825307708 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -150,6 +150,7 @@ object SharedState { conf.get(CATALOG_IMPLEMENTATION) match { case "hive" => HIVE_EXTERNAL_CATALOG_CLASS_NAME case "in-memory" => classOf[InMemoryCatalog].getCanonicalName + case name => name } }