Skip to content

Commit

Permalink
Allow custom external catalogs (#127)
Browse files Browse the repository at this point in the history
This adds config that allows us to to inject a custom session builder.
Internally we use it to build SparkSessions that highly configured
beyond what Spark's built-in configs allow. Most importantly that
includes building and registering our own session catalog (v1)
implementation with the SparkSession.

You can find how we use this config here [1] and our own
SessionStateBuilder here [2].

[1]: https://pl.ntr/1UU
[2]: https://pl.ntr/1UT

Co-authored-by: Robert Kruszewski <[email protected]>
Co-authored-by: Josh Casale <[email protected]>
Co-authored-by: Will Raschkowski <[email protected]>
  • Loading branch information
3 people committed Mar 3, 2021
1 parent 6476ac9 commit 665ab02
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ object StaticSQLConf {
.internal()
.version("2.0.0")
.stringConf
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")

val SESSION_STATE_IMPLEMENTATION = buildStaticConf("spark.sql.sessionStateImplementation")
.internal()
.stringConf
.createWithDefault(CATALOG_IMPLEMENTATION.defaultValueString)

val GLOBAL_TEMP_DATABASE = buildStaticConf("spark.sql.globalTempDatabase")
.internal()
.version("2.1.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.ExternalCommandExecutor
import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
import org.apache.spark.sql.internal._
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, SESSION_STATE_IMPLEMENTATION}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.{DataType, StructType}
Expand Down Expand Up @@ -866,6 +866,7 @@ object SparkSession extends Logging {
*/
def enableHiveSupport(): Builder = synchronized {
if (hiveClassesArePresent) {
config(SESSION_STATE_IMPLEMENTATION.key, "hive")
config(CATALOG_IMPLEMENTATION.key, "hive")
} else {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -1083,9 +1084,10 @@ object SparkSession extends Logging {
"org.apache.spark.sql.hive.HiveSessionStateBuilder"

private def sessionStateClassName(conf: SparkConf): String = {
conf.get(CATALOG_IMPLEMENTATION) match {
conf.get(SESSION_STATE_IMPLEMENTATION) match {
case "hive" => HIVE_SESSION_STATE_BUILDER_CLASS_NAME
case "in-memory" => classOf[SessionStateBuilder].getCanonicalName
case builder => builder
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ object SharedState extends Logging {
conf.get(CATALOG_IMPLEMENTATION) match {
case "hive" => HIVE_EXTERNAL_CATALOG_CLASS_NAME
case "in-memory" => classOf[InMemoryCatalog].getCanonicalName
case name => name
}
}

Expand Down

0 comments on commit 665ab02

Please sign in to comment.