Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed May 6, 2015
1 parent 4d8bf02 commit ab07f7e
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 23 deletions.
2 changes: 2 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.linalg.Vector.numActives")
) ++ Seq(
// Execution should never be included as its always internal.
MimaBuild.excludeSparkPackage("sql.execution"),
// This `protected[sql]` method was removed in 1.3.1
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.sql.SQLContext.checkAnalysis"),
Expand Down
6 changes: 5 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ object SparkBuild extends PomBuild {
* Usage: `build/sbt sparkShell`
*/
val sparkShell = taskKey[Unit]("start a spark-shell.")
val sparkSql = taskKey[Unit]("starts the spark sql CLI.")

enable(Seq(
connectInput in run := true,
Expand All @@ -203,6 +204,9 @@ object SparkBuild extends PomBuild {

sparkShell := {
(runMain in Compile).toTask(" org.apache.spark.repl.Main -usejavacp").value
},
sparkSql := {
(runMain in Compile).toTask(" org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver").value
}
))(assembly)

Expand Down Expand Up @@ -497,7 +501,7 @@ object TestSettings {
// Setting SPARK_DIST_CLASSPATH is a simple way to make sure any child processes
// launched by the tests have access to the correct test-time classpath.
envVars in Test ++= Map(
"SPARK_DIST_CLASSPATH" ->
"SPARK_DIST_CLASSPATH" ->
(fullClasspath in Test).value.files.map(_.getAbsolutePath).mkString(":").stripSuffix(":"),
"JAVA_HOME" -> sys.env.get("JAVA_HOME").getOrElse(sys.props("java.home"))),
javaOptions in Test += "-Dspark.test.home=" + sparkHome,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
}

sparkContext.getConf.getAll.foreach {
case (key, value) if key.startsWith("spark.sql") => setConf(key, value)
case (key, value) if key.startsWith("spark.sql") =>
println(s"$key=$value")
setConf(key, value)
case _ =>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext}
* A logical command that is executed for its side-effects. `RunnableCommand`s are
* wrapped in `ExecutedCommand` during execution.
*/
trait RunnableCommand extends LogicalPlan with logical.Command {
private[sql] trait RunnableCommand extends LogicalPlan with logical.Command {
self: Product =>

override def output: Seq[Attribute] = Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,16 @@ private[hive] object SparkSQLCLIDriver {
System.exit(1)
}

val sessionState = new CliSessionState(new HiveConf(classOf[SessionState]))
val localMetastore = {
val temp = Utils.createTempDir()
temp.delete()
temp
}
val cliConf = new HiveConf(classOf[SessionState])
// Override the location of the metastore since this is only used for local execution.
cliConf.set(
"javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$localMetastore;create=true")
val sessionState = new CliSessionState(cliConf)

sessionState.in = System.in
try {
Expand All @@ -92,9 +101,9 @@ private[hive] object SparkSQLCLIDriver {
// Set all properties specified via command line.
val conf: HiveConf = sessionState.getConf
sessionState.cmdProperties.entrySet().foreach { item: java.util.Map.Entry[Object, Object] =>
conf.set(item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String])
sessionState.getOverriddenConfigurations.put(
item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String])
//conf.set(item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String])
//sessionState.getOverriddenConfigurations.put(
// item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String])
}

SessionState.start(sessionState)
Expand Down Expand Up @@ -138,8 +147,9 @@ private[hive] object SparkSQLCLIDriver {
case e: UnsupportedEncodingException => System.exit(3)
}

// use the specified database if specified
cli.processSelectDatabase(sessionState);
if (sessionState.database != null) {
SparkSQLEnv.hiveContext.runSqlHive(s"USE ${sessionState.database}")
}

// Execute -i init files (always in silent mode)
cli.processInitFiles(sessionState)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.hive.thriftserver

import java.io.PrintStream

import scala.collection.JavaConversions._

import org.apache.spark.scheduler.StatsReportListener
Expand All @@ -39,7 +41,6 @@ private[hive] object SparkSQLEnv extends Logging {

sparkConf
.setAppName(s"SparkSQL::${Utils.localHostName()}")
.set("spark.sql.hive.version", HiveShim.version)
.set(
"spark.serializer",
maybeSerializer.getOrElse("org.apache.spark.serializer.KryoSerializer"))
Expand All @@ -51,6 +52,12 @@ private[hive] object SparkSQLEnv extends Logging {
sparkContext.addSparkListener(new StatsReportListener())
hiveContext = new HiveContext(sparkContext)

hiveContext.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
hiveContext.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
hiveContext.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))

hiveContext.setConf("spark.sql.hive.version", HiveShim.version)

if (log.isDebugEnabled) {
hiveContext.hiveconf.getAllProperties.toSeq.sorted.foreach { case (k, v) =>
logDebug(s"HiveConf var: $k=$v")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ abstract class HiveThriftServer2Test extends FunSuite with BeforeAndAfterAll wit
diagnosisBuffer.clear()

// Retries up to 3 times with different port numbers if the server fails to start
(1 to 3).foldLeft(Try(startThriftServer(listeningPort, 0))) { case (started, attempt) =>
Seq.empty.foldLeft(Try(startThriftServer(listeningPort, 0))) { case (started, attempt) =>
started.orElse {
listeningPort += 1
stopThriftServer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override def setConf(key: String, value: String): Unit = {
super.setConf(key, value)
hiveconf.set(key, value)
runSqlHive(s"SET $key=$value")
executionHive.runSqlHive(s"SET $key=$value")
metadataHive.runSqlHive(s"SET $key=$value")
}

/* A catalyst metadata catalog that points to the Hive Metastore. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.hive.client

import java.io.PrintStream
import java.util.{Map => JMap}

import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
Expand Down Expand Up @@ -88,6 +89,10 @@ trait ClientInterface {
*/
def runSqlHive(sql: String): Seq[String]

def setOut(stream: PrintStream): Unit
def setInfo(stream: PrintStream): Unit
def setError(stream: PrintStream): Unit

/** Returns the names of all tables in the given database. */
def listTables(dbName: String): Seq[String]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ class ClientWrapper(
with Logging
with ReflectionMagic {

private val conf = new HiveConf(classOf[SessionState])
config.foreach { case (k, v) =>
logDebug(s"Hive Config: $k=$v")
conf.set(k, v)
}

// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
private val outputBuffer = new java.io.OutputStream {
var pos: Int = 0
Expand Down Expand Up @@ -100,17 +94,30 @@ class ClientWrapper(
val original = Thread.currentThread().getContextClassLoader
Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
val ret = try {
val newState = new SessionState(conf)
SessionState.start(newState)
newState.out = new PrintStream(outputBuffer, true, "UTF-8")
newState.err = new PrintStream(outputBuffer, true, "UTF-8")
newState
val oldState = SessionState.get()
if (oldState == null) {
val initialConf = new HiveConf(classOf[SessionState])
config.foreach { case (k, v) =>
logDebug(s"Hive Config: $k=$v")
initialConf.set(k, v)
}
val newState = new SessionState(initialConf)
SessionState.start(newState)
newState.out = new PrintStream(outputBuffer, true, "UTF-8")
newState.err = new PrintStream(outputBuffer, true, "UTF-8")
newState
} else {
oldState
}
} finally {
Thread.currentThread().setContextClassLoader(original)
}
ret
}

/** Returns the configuration for the current session. */
def conf = SessionState.get().getConf

private val client = Hive.get(conf)

/**
Expand All @@ -134,6 +141,18 @@ class ClientWrapper(
ret
}

def setOut(stream: PrintStream): Unit = withHiveState {
state.out = stream
}

def setInfo(stream: PrintStream): Unit = withHiveState {
state.info = stream
}

def setError(stream: PrintStream): Unit = withHiveState {
state.err = stream
}

override def currentDatabase: String = withHiveState {
state.getCurrentDatabase
}
Expand Down

0 comments on commit ab07f7e

Please sign in to comment.