Skip to content

Commit

Permalink
Allow custom external catalogs (#127)
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 authored Mar 18, 2017
1 parent ab03652 commit 8e5c2b1
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState}
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, SESSION_STATE_IMPLEMENTATION}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.{DataType, LongType, StructType}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.ExecutionListenerManager
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -805,6 +805,7 @@ object SparkSession {
*/
def enableHiveSupport(): Builder = synchronized {
if (hiveClassesArePresent) {
config(SESSION_STATE_IMPLEMENTATION.key, "hive")
config(CATALOG_IMPLEMENTATION.key, "hive")
} else {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -964,9 +965,10 @@ object SparkSession {
private val HIVE_SESSION_STATE_CLASS_NAME = "org.apache.spark.sql.hive.HiveSessionState"

private def sessionStateClassName(conf: SparkConf): String = {
conf.get(CATALOG_IMPLEMENTATION) match {
conf.get(SESSION_STATE_IMPLEMENTATION) match {
case "hive" => HIVE_SESSION_STATE_CLASS_NAME
case "in-memory" => classOf[SessionState].getCanonicalName
case name => name
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
val table = r.tableMeta
val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
val cache = sparkSession.sessionState.catalog.tableRelationCache
val withHiveSupport =
sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive"
val inMemory =
sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "in-memory"

val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
override def call(): LogicalPlan = {
Expand All @@ -232,7 +232,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
className = table.provider.get,
options = table.storage.properties ++ pathOption,
// TODO: improve `InMemoryCatalog` and remove this limitation.
catalogTable = if (withHiveSupport) Some(table) else None)
catalogTable = if (inMemory) None else Some(table))

LogicalRelation(
dataSource.resolveRelation(checkFilesExist = false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1031,9 +1031,13 @@ object StaticSQLConf {
val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation")
.internal()
.stringConf
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")

val SESSION_STATE_IMPLEMENTATION = buildStaticConf("spark.sql.sessionStateImplementation")
.internal()
.stringConf
.createWithDefault(CATALOG_IMPLEMENTATION.defaultValueString)

val GLOBAL_TEMP_DATABASE = buildStaticConf("spark.sql.globalTempDatabase")
.internal()
.stringConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.util.ExecutionListenerManager
/**
* A class that holds all session-specific state in a given [[SparkSession]].
*/
private[sql] class SessionState(sparkSession: SparkSession) {
class SessionState(sparkSession: SparkSession) {

// Note: These are all lazy vals because they depend on each other (e.g. conf) and we
// want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ object SharedState {
conf.get(CATALOG_IMPLEMENTATION) match {
case "hive" => HIVE_EXTERNAL_CATALOG_CLASS_NAME
case "in-memory" => classOf[InMemoryCatalog].getCanonicalName
case name => name
}
}

Expand Down

0 comments on commit 8e5c2b1

Please sign in to comment.