diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 6ec16f3d2111b..455a2ecf7f0ea 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -65,6 +65,8 @@ private[hive] class HiveQLDialect extends Dialect { class HiveContext(sc: SparkContext) extends SQLContext(sc) { self => + import HiveContext._ + /** * When true, enables an experimental feature where metastore tables that use the parquet SerDe * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive @@ -103,18 +105,18 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { * Spark SQL for execution. */ protected[hive] def hiveMetastoreVersion: String = - getConf("spark.sql.hive.metastore.version", "0.13.1") + getConf(HIVE_METASTORE_VERSION, "0.13.1") /** * The location of the jars that should be used to instantiate the HiveMetastoreClient. This * property can be one of three option: - * - a comma-separated list of jar files that could be passed to a URLClassLoader + * - a colon-separated list of jar files or directories for hive and hadoop. * - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This * option is only valid when using the execution version of Hive. * - maven - download the correct version of hive on demand from maven. */ protected[hive] def hiveMetastoreJars: String = - getConf("spark.sql.hive.metastore.jars", "builtin") + getConf(HIVE_METASTORE_JARS, "builtin") @transient protected[sql] lazy val substitutor = new VariableSubstitution() @@ -173,8 +175,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { throw new IllegalArgumentException( "Builtin jars can only be used when hive execution version == hive metastore version. " + s"Execution: ${hiveExecutionVersion} != Metastore: ${hiveMetastoreVersion}. " + - "Specify a vaild path to the correct hive jars using spark.sql.hive.metastore.jars " + - s"or change spark.sql.hive.metastore.version to ${hiveExecutionVersion}.") + "Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " + + s"or change $HIVE_METASTORE_VERSION to $hiveExecutionVersion.") } val jars = getClass.getClassLoader match { case urlClassLoader: java.net.URLClassLoader => urlClassLoader.getURLs @@ -198,7 +200,17 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.") IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig ) } else { - val jars = hiveMetastoreJars.split(",").map(new java.net.URL(_)) + // Convert to files and expand any directories. + val jars = + hiveMetastoreJars + .split(":") + .map(new java.io.File(_)) + .flatMap { + case f if f.isDirectory => f.listFiles() + case f => f :: Nil + } + .map(_.toURI.toURL) + logInfo( s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using $jars") new IsolatedClientLoader( @@ -460,7 +472,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } -private object HiveContext { +private[hive] object HiveContext { + val HIVE_METASTORE_VERSION: String = "spark.sql.hive.metastore.version" + val HIVE_METASTORE_JARS: String = "spark.sql.hive.metastore.jars" + protected val primitiveTypes = Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType, ShortType, DateType, TimestampType, BinaryType) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 13986822e895d..7e212e9fafae6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -30,6 +30,7 @@ import org.apache.spark.Logging import org.apache.spark.deploy.SparkSubmitUtils import org.apache.spark.sql.catalyst.util.quietly +import org.apache.spark.sql.hive.HiveContext /** Factory for `IsolatedClientLoader` with specific versions of hive. */ object IsolatedClientLoader { @@ -166,6 +167,12 @@ class IsolatedClientLoader( .getConstructors.head .newInstance(version, config) .asInstanceOf[ClientInterface] + } catch { + case ReflectionException(cnf: NoClassDefFoundError) => + throw new ClassNotFoundException( + s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" + + "Please make sure that jars for your version of hive and hadoop are included in the " + + s"paths passed to ${HiveContext.HIVE_METASTORE_JARS}.") } finally { Thread.currentThread.setContextClassLoader(baseClassLoader) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ReflectionMagic.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ReflectionMagic.scala index 90d03049356b5..c600b158c5460 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ReflectionMagic.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ReflectionMagic.scala @@ -19,6 +19,14 @@ package org.apache.spark.sql.hive.client import scala.reflect._ +/** Unwraps reflection exceptions. */ +private[client] object ReflectionException { + def unapply(a: Throwable): Option[Throwable] = a match { + case ite: java.lang.reflect.InvocationTargetException => Option(ite.getCause) + case _ => None + } +} + /** * Provides implicit functions on any object for calling methods reflectively. */