Skip to content

Commit

Permalink
[SPARK-6907] [SQL] Isolated client for HiveMetastore
Browse files Browse the repository at this point in the history
This PR adds initial support for loading multiple versions of Hive in a single JVM and provides a common interface for extracting metadata from the `HiveMetastoreClient` for a given version.  This is accomplished by creating an isolated `ClassLoader` that operates according to the following rules:

 - __Shared Classes__: Java, Scala, logging, and Spark classes are delegated to `baseClassLoader`
  allowing the results of calls to the `ClientInterface` to be visible externally.
 - __Hive Classes__: new instances are loaded from `execJars`.  These classes are not
  accessible externally due to their custom loading.
 - __Barrier Classes__: Classes such as `ClientWrapper` are defined in Spark but must link to a specific version of Hive.  As a result, the bytecode is acquired from the Spark `ClassLoader` but a new copy is created for each instance of `IsolatedClientLoader`.
  This new instance is able to see a specific version of hive without using reflection where ever hive is consistent across versions. Since
  this is a unique instance, it is not visible externally other than as a generic
  `ClientInterface`, unless `isolationOn` is set to `false`.

In addition to the unit tests, I have also tested this locally against mysql instances of the Hive Metastore.  I've also successfully ported Spark SQL to run with this client, but due to the size of the changes, that will come in a follow-up PR.

By default, Hive jars are currently downloaded from Maven automatically for a given version to ease packaging and testing.  However, there is also support for specifying their location manually for deployments without internet.

Author: Michael Armbrust <[email protected]>

Closes apache#5851 from marmbrus/isolatedClient and squashes the following commits:

c72f6ac [Michael Armbrust] rxins comments
1e271fa [Michael Armbrust] [SPARK-6907][SQL] Isolated client for HiveMetastore
  • Loading branch information
marmbrus authored and jeanlyn committed Jun 12, 2015
1 parent 5ca604c commit dc11482
Show file tree
Hide file tree
Showing 9 changed files with 1,088 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ object SparkSubmit {
}

/** Provides utility functions to be used inside SparkSubmit. */
private[deploy] object SparkSubmitUtils {
private[spark] object SparkSubmitUtils {

// Exposed for testing
var printStream = SparkSubmit.printStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
*/
class NoSuchTableException extends Exception

class NoSuchDatabaseException extends Exception

/**
* An interface for looking up relations by name. Used by an [[Analyzer]].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,31 @@

package org.apache.spark.sql.catalyst

import java.io.{PrintWriter, ByteArrayOutputStream, FileInputStream, File}
import java.io._

import org.apache.spark.util.Utils

package object util {

/** Silences output to stderr or stdout for the duration of f */
def quietly[A](f: => A): A = {
val origErr = System.err
val origOut = System.out
try {
System.setErr(new PrintStream(new OutputStream {
def write(b: Int) = {}
}))
System.setOut(new PrintStream(new OutputStream {
def write(b: Int) = {}
}))

f
} finally {
System.setErr(origErr)
System.setOut(origOut)
}
}

def fileToString(file: File, encoding: String = "UTF-8"): String = {
val inStream = new FileInputStream(file)
val outStream = new ByteArrayOutputStream
Expand All @@ -42,10 +61,9 @@ package object util {
new String(outStream.toByteArray, encoding)
}

def resourceToString(
resource:String,
encoding: String = "UTF-8",
classLoader: ClassLoader = Utils.getSparkClassLoader): String = {
def resourceToBytes(
resource: String,
classLoader: ClassLoader = Utils.getSparkClassLoader): Array[Byte] = {
val inStream = classLoader.getResourceAsStream(resource)
val outStream = new ByteArrayOutputStream
try {
Expand All @@ -61,7 +79,14 @@ package object util {
finally {
inStream.close()
}
new String(outStream.toByteArray, encoding)
outStream.toByteArray
}

def resourceToString(
resource:String,
encoding: String = "UTF-8",
classLoader: ClassLoader = Utils.getSparkClassLoader): String = {
new String(resourceToBytes(resource, classLoader), encoding)
}

def stringToFile(file: File, str: String): File = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.client

import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}

case class HiveDatabase(
name: String,
location: String)

abstract class TableType { val name: String }
case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" }
case object IndexTable extends TableType { override val name = "INDEX_TABLE" }
case object ManagedTable extends TableType { override val name = "MANAGED_TABLE" }
case object VirtualView extends TableType { override val name = "VIRTUAL_VIEW" }

case class HiveStorageDescriptor(
location: String,
inputFormat: String,
outputFormat: String,
serde: String)

case class HivePartition(
values: Seq[String],
storage: HiveStorageDescriptor)

case class HiveColumn(name: String, hiveType: String, comment: String)
case class HiveTable(
specifiedDatabase: Option[String],
name: String,
schema: Seq[HiveColumn],
partitionColumns: Seq[HiveColumn],
properties: Map[String, String],
serdeProperties: Map[String, String],
tableType: TableType,
location: Option[String] = None,
inputFormat: Option[String] = None,
outputFormat: Option[String] = None,
serde: Option[String] = None) {

@transient
private[client] var client: ClientInterface = _

private[client] def withClient(ci: ClientInterface): this.type = {
client = ci
this
}

def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved"))

def isPartitioned: Boolean = partitionColumns.nonEmpty

def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)

// Hive does not support backticks when passing names to the client.
def qualifiedName: String = s"$database.$name"
}

/**
* An externally visible interface to the Hive client. This interface is shared across both the
* internal and external classloaders for a given version of Hive and thus must expose only
* shared classes.
*/
trait ClientInterface {
/**
* Runs a HiveQL command using Hive, returning the results as a list of strings. Each row will
* result in one string.
*/
def runSqlHive(sql: String): Seq[String]

/** Returns the names of all tables in the given database. */
def listTables(dbName: String): Seq[String]

/** Returns the name of the active database. */
def currentDatabase: String

/** Returns the metadata for specified database, throwing an exception if it doesn't exist */
def getDatabase(name: String): HiveDatabase = {
getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException)
}

/** Returns the metadata for a given database, or None if it doesn't exist. */
def getDatabaseOption(name: String): Option[HiveDatabase]

/** Returns the specified table, or throws [[NoSuchTableException]]. */
def getTable(dbName: String, tableName: String): HiveTable = {
getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException)
}

/** Returns the metadata for the specified table or None if it doens't exist. */
def getTableOption(dbName: String, tableName: String): Option[HiveTable]

/** Creates a table with the given metadata. */
def createTable(table: HiveTable): Unit

/** Updates the given table with new metadata. */
def alterTable(table: HiveTable): Unit

/** Creates a new database with the given name. */
def createDatabase(database: HiveDatabase): Unit

/** Returns all partitions for the given table. */
def getAllPartitions(hTable: HiveTable): Seq[HivePartition]

/** Loads a static partition into an existing table. */
def loadPartition(
loadPath: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean): Unit

/** Loads data into an existing table. */
def loadTable(
loadPath: String, // TODO URI
tableName: String,
replace: Boolean,
holdDDLTime: Boolean): Unit

/** Loads new dynamic partitions into an existing table. */
def loadDynamicPartitions(
loadPath: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
replace: Boolean,
numDP: Int,
holdDDLTime: Boolean,
listBucketingEnabled: Boolean): Unit

/** Used for testing only. Removes all metadata from this instance of Hive. */
def reset(): Unit
}
Loading

0 comments on commit dc11482

Please sign in to comment.