Skip to content

Commit

Permalink
better error messages and jar handling
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed May 6, 2015
1 parent e7b3941 commit 7e8f010
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ private[hive] class HiveQLDialect extends Dialect {
class HiveContext(sc: SparkContext) extends SQLContext(sc) {
self =>

import HiveContext._

/**
* When true, enables an experimental feature where metastore tables that use the parquet SerDe
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
Expand Down Expand Up @@ -103,18 +105,18 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* Spark SQL for execution.
*/
protected[hive] def hiveMetastoreVersion: String =
getConf("spark.sql.hive.metastore.version", "0.13.1")
getConf(HIVE_METASTORE_VERSION, "0.13.1")

/**
* The location of the jars that should be used to instantiate the HiveMetastoreClient. This
* property can be one of three option:
* - a comma-separated list of jar files that could be passed to a URLClassLoader
* - a colon-separated list of jar files or directories for hive and hadoop.
* - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This
* option is only valid when using the execution version of Hive.
* - maven - download the correct version of hive on demand from maven.
*/
protected[hive] def hiveMetastoreJars: String =
getConf("spark.sql.hive.metastore.jars", "builtin")
getConf(HIVE_METASTORE_JARS, "builtin")

@transient
protected[sql] lazy val substitutor = new VariableSubstitution()
Expand Down Expand Up @@ -173,8 +175,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
throw new IllegalArgumentException(
"Builtin jars can only be used when hive execution version == hive metastore version. " +
s"Execution: ${hiveExecutionVersion} != Metastore: ${hiveMetastoreVersion}. " +
"Specify a vaild path to the correct hive jars using spark.sql.hive.metastore.jars " +
s"or change spark.sql.hive.metastore.version to ${hiveExecutionVersion}.")
"Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " +
s"or change $HIVE_METASTORE_VERSION to $hiveExecutionVersion.")
}
val jars = getClass.getClassLoader match {
case urlClassLoader: java.net.URLClassLoader => urlClassLoader.getURLs
Expand All @@ -198,7 +200,17 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig )
} else {
val jars = hiveMetastoreJars.split(",").map(new java.net.URL(_))
// Convert to files and expand any directories.
val jars =
hiveMetastoreJars
.split(":")
.map(new java.io.File(_))
.flatMap {
case f if f.isDirectory => f.listFiles()
case f => f :: Nil
}
.map(_.toURI.toURL)

logInfo(
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using $jars")
new IsolatedClientLoader(
Expand Down Expand Up @@ -460,7 +472,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}


private object HiveContext {
private[hive] object HiveContext {
val HIVE_METASTORE_VERSION: String = "spark.sql.hive.metastore.version"
val HIVE_METASTORE_JARS: String = "spark.sql.hive.metastore.jars"

protected val primitiveTypes =
Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType,
ShortType, DateType, TimestampType, BinaryType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.Logging
import org.apache.spark.deploy.SparkSubmitUtils

import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveContext

/** Factory for `IsolatedClientLoader` with specific versions of hive. */
object IsolatedClientLoader {
Expand Down Expand Up @@ -166,6 +167,12 @@ class IsolatedClientLoader(
.getConstructors.head
.newInstance(version, config)
.asInstanceOf[ClientInterface]
} catch {
case ReflectionException(cnf: NoClassDefFoundError) =>
throw new ClassNotFoundException(
s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" +
"Please make sure that jars for your version of hive and hadoop are included in the " +
s"paths passed to ${HiveContext.HIVE_METASTORE_JARS}.")
} finally {
Thread.currentThread.setContextClassLoader(baseClassLoader)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ package org.apache.spark.sql.hive.client

import scala.reflect._

/** Unwraps reflection exceptions. */
private[client] object ReflectionException {
def unapply(a: Throwable): Option[Throwable] = a match {
case ite: java.lang.reflect.InvocationTargetException => Option(ite.getCause)
case _ => None
}
}

/**
* Provides implicit functions on any object for calling methods reflectively.
*/
Expand Down

0 comments on commit 7e8f010

Please sign in to comment.