Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6907][SQL] Isolated client for HiveMetastore #5851

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ object SparkSubmit {
}

/** Provides utility functions to be used inside SparkSubmit. */
private[deploy] object SparkSubmitUtils {
private[spark] object SparkSubmitUtils {

// Exposed for testing
var printStream = SparkSubmit.printStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
*/
class NoSuchTableException extends Exception

class NoSuchDatabaseException extends Exception

/**
* An interface for looking up relations by name. Used by an [[Analyzer]].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,31 @@

package org.apache.spark.sql.catalyst

import java.io.{PrintWriter, ByteArrayOutputStream, FileInputStream, File}
import java.io._

import org.apache.spark.util.Utils

package object util {

/** Silences output to stderr or stdout for the duration of f */
def quietly[A](f: => A): A = {
val origErr = System.err
val origOut = System.out
try {
System.setErr(new PrintStream(new OutputStream {
def write(b: Int) = {}
}))
System.setOut(new PrintStream(new OutputStream {
def write(b: Int) = {}
}))

f
} finally {
System.setErr(origErr)
System.setOut(origOut)
}
}

def fileToString(file: File, encoding: String = "UTF-8"): String = {
val inStream = new FileInputStream(file)
val outStream = new ByteArrayOutputStream
Expand All @@ -42,10 +61,9 @@ package object util {
new String(outStream.toByteArray, encoding)
}

def resourceToString(
resource:String,
encoding: String = "UTF-8",
classLoader: ClassLoader = Utils.getSparkClassLoader): String = {
def resourceToBytes(
resource: String,
classLoader: ClassLoader = Utils.getSparkClassLoader): Array[Byte] = {
val inStream = classLoader.getResourceAsStream(resource)
val outStream = new ByteArrayOutputStream
try {
Expand All @@ -61,7 +79,14 @@ package object util {
finally {
inStream.close()
}
new String(outStream.toByteArray, encoding)
outStream.toByteArray
}

def resourceToString(
resource:String,
encoding: String = "UTF-8",
classLoader: ClassLoader = Utils.getSparkClassLoader): String = {
new String(resourceToBytes(resource, classLoader), encoding)
}

def stringToFile(file: File, str: String): File = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.client

import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}

case class HiveDatabase(
name: String,
location: String)

abstract class TableType { val name: String }
case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" }
case object IndexTable extends TableType { override val name = "INDEX_TABLE" }
case object ManagedTable extends TableType { override val name = "MANAGED_TABLE" }
case object VirtualView extends TableType { override val name = "VIRTUAL_VIEW" }

case class HiveStorageDescriptor(
location: String,
inputFormat: String,
outputFormat: String,
serde: String)

case class HivePartition(
values: Seq[String],
storage: HiveStorageDescriptor)

case class HiveColumn(name: String, hiveType: String, comment: String)
case class HiveTable(
specifiedDatabase: Option[String],
name: String,
schema: Seq[HiveColumn],
partitionColumns: Seq[HiveColumn],
properties: Map[String, String],
serdeProperties: Map[String, String],
tableType: TableType,
location: Option[String] = None,
inputFormat: Option[String] = None,
outputFormat: Option[String] = None,
serde: Option[String] = None) {

@transient
private[client] var client: ClientInterface = _

private[client] def withClient(ci: ClientInterface): this.type = {
client = ci
this
}

def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved"))

def isPartitioned: Boolean = partitionColumns.nonEmpty

def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)

// Hive does not support backticks when passing names to the client.
def qualifiedName: String = s"$database.$name"
}

/**
* An externally visible interface to the Hive client. This interface is shared across both the
* internal and external classloaders for a given version of Hive and thus must expose only
* shared classes.
*/
trait ClientInterface {
/**
* Runs a HiveQL command using Hive, returning the results as a list of strings. Each row will
* result in one string.
*/
def runSqlHive(sql: String): Seq[String]

/** Returns the names of all tables in the given database. */
def listTables(dbName: String): Seq[String]

/** Returns the name of the active database. */
def currentDatabase: String

/** Returns the metadata for specified database, throwing an exception if it doesn't exist */
def getDatabase(name: String): HiveDatabase = {
getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException)
}

/** Returns the metadata for a given database, or None if it doesn't exist. */
def getDatabaseOption(name: String): Option[HiveDatabase]

/** Returns the specified table, or throws [[NoSuchTableException]]. */
def getTable(dbName: String, tableName: String): HiveTable = {
getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException)
}

/** Returns the metadata for the specified table or None if it doens't exist. */
def getTableOption(dbName: String, tableName: String): Option[HiveTable]

/** Creates a table with the given metadata. */
def createTable(table: HiveTable): Unit

/** Updates the given table with new metadata. */
def alterTable(table: HiveTable): Unit

/** Creates a new database with the given name. */
def createDatabase(database: HiveDatabase): Unit

/** Returns all partitions for the given table. */
def getAllPartitions(hTable: HiveTable): Seq[HivePartition]

/** Loads a static partition into an existing table. */
def loadPartition(
loadPath: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean): Unit

/** Loads data into an existing table. */
def loadTable(
loadPath: String, // TODO URI
tableName: String,
replace: Boolean,
holdDDLTime: Boolean): Unit

/** Loads new dynamic partitions into an existing table. */
def loadDynamicPartitions(
loadPath: String,
tableName: String,
partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
replace: Boolean,
numDP: Int,
holdDDLTime: Boolean,
listBucketingEnabled: Boolean): Unit

/** Used for testing only. Removes all metadata from this instance of Hive. */
def reset(): Unit
}
Loading