Skip to content

Commit

Permalink
[SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, an…
Browse files Browse the repository at this point in the history
…d fixes for complex types

SPARK-4407 was detected while working on SPARK-4309. Merged these two into a single PR since 1.2.0 RC is approaching.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3178)
<!-- Reviewable:end -->

Author: Cheng Lian <[email protected]>

Closes apache#3178 from liancheng/date-for-thriftserver and squashes the following commits:

6f71d0b [Cheng Lian] Makes toHiveString static
26fa955 [Cheng Lian] Fixes complex type support in Hive 0.13.1 shim
a92882a [Cheng Lian] Updates HiveShim for 0.13.1
73f442b [Cheng Lian] Adds Date support for HiveThriftServer2 (Hive 0.12.0)
  • Loading branch information
liancheng authored and marmbrus committed Nov 16, 2014
1 parent 7850e0c commit cb6bd83
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package org.apache.spark.sql.hive.thriftserver

import java.io.File
import java.net.ServerSocket
import java.sql.{DriverManager, Statement}
import java.sql.{Date, DriverManager, Statement}
import java.util.concurrent.TimeoutException

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.concurrent.{Await, Promise}
Expand Down Expand Up @@ -51,6 +52,15 @@ import org.apache.spark.sql.hive.HiveShim
class HiveThriftServer2Suite extends FunSuite with Logging {
Class.forName(classOf[HiveDriver].getCanonicalName)

object TestData {
def getTestDataFilePath(name: String) = {
Thread.currentThread().getContextClassLoader.getResource(s"data/files/$name")
}

val smallKv = getTestDataFilePath("small_kv.txt")
val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt")
}

def randomListeningPort = {
// Let the system to choose a random available port to avoid collision with other parallel
// builds.
Expand Down Expand Up @@ -145,12 +155,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
}
}

val env = Seq(
// Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
"SPARK_TESTING" -> "0",
// Prevents loading classes out of the assembly jar. Otherwise Utils.sparkVersion can't read
// proper version information from the jar manifest.
"SPARK_PREPEND_CLASSES" -> "")
// Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
val env = Seq("SPARK_TESTING" -> "0")

Process(command, None, env: _*).run(ProcessLogger(
captureThriftServerOutput("stdout"),
Expand Down Expand Up @@ -194,15 +200,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging {

test("Test JDBC query execution") {
withJdbcStatement() { statement =>
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")

val queries =
s"""SET spark.sql.shuffle.partitions=3;
|CREATE TABLE test(key INT, val STRING);
|LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test;
|CACHE TABLE test;
""".stripMargin.split(";").map(_.trim).filter(_.nonEmpty)
val queries = Seq(
"SET spark.sql.shuffle.partitions=3",
"DROP TABLE IF EXISTS test",
"CREATE TABLE test(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test",
"CACHE TABLE test")

queries.foreach(statement.execute)

Expand All @@ -216,14 +219,10 @@ class HiveThriftServer2Suite extends FunSuite with Logging {

test("SPARK-3004 regression: result set containing NULL") {
withJdbcStatement() { statement =>
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource(
"data/files/small_kv_with_null.txt")

val queries = Seq(
"DROP TABLE IF EXISTS test_null",
"CREATE TABLE test_null(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test_null")
s"LOAD DATA LOCAL INPATH '${TestData.smallKvWithNull}' OVERWRITE INTO TABLE test_null")

queries.foreach(statement.execute)

Expand Down Expand Up @@ -270,24 +269,63 @@ class HiveThriftServer2Suite extends FunSuite with Logging {

test("SPARK-4292 regression: result set iterator issue") {
withJdbcStatement() { statement =>
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")

val queries = Seq(
"DROP TABLE IF EXISTS test_4292",
"CREATE TABLE test_4292(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test_4292")
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_4292")

queries.foreach(statement.execute)

val resultSet = statement.executeQuery("SELECT key FROM test_4292")

Seq(238, 86, 311, 27, 165).foreach { key =>
resultSet.next()
assert(resultSet.getInt(1) == key)
assert(resultSet.getInt(1) === key)
}

statement.executeQuery("DROP TABLE IF EXISTS test_4292")
}
}

test("SPARK-4309 regression: Date type support") {
withJdbcStatement() { statement =>
val queries = Seq(
"DROP TABLE IF EXISTS test_date",
"CREATE TABLE test_date(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_date")

queries.foreach(statement.execute)

assertResult(Date.valueOf("2011-01-01")) {
val resultSet = statement.executeQuery(
"SELECT CAST('2011-01-01' as date) FROM test_date LIMIT 1")
resultSet.next()
resultSet.getDate(1)
}
}
}

test("SPARK-4407 regression: Complex type support") {
withJdbcStatement() { statement =>
val queries = Seq(
"DROP TABLE IF EXISTS test_map",
"CREATE TABLE test_map(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map")

queries.foreach(statement.execute)

assertResult("""{238:"val_238"}""") {
val resultSet = statement.executeQuery("SELECT MAP(key, value) FROM test_map LIMIT 1")
resultSet.next()
resultSet.getString(1)
}

assertResult("""["238","val_238"]""") {
val resultSet = statement.executeQuery(
"SELECT ARRAY(CAST(key AS STRING), value) FROM test_map LIMIT 1")
resultSet.next()
resultSet.getString(1)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hive.thriftserver

import java.sql.Timestamp
import java.sql.{Date, Timestamp}
import java.util.{ArrayList => JArrayList, Map => JMap}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -131,14 +131,13 @@ private[hive] class SparkExecuteStatementOperation(
to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal)))
case ShortType =>
to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal)))
case DateType =>
to.addColumnValue(ColumnValue.dateValue(from(ordinal).asInstanceOf[Date]))
case TimestampType =>
to.addColumnValue(
ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp]))
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
val hiveString = result
.queryExecution
.asInstanceOf[HiveContext#QueryExecution]
.toHiveString((from.get(ordinal), dataTypes(ordinal)))
val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal)))
to.addColumnValue(ColumnValue.stringValue(hiveString))
}
}
Expand All @@ -163,6 +162,8 @@ private[hive] class SparkExecuteStatementOperation(
to.addColumnValue(ColumnValue.byteValue(null))
case ShortType =>
to.addColumnValue(ColumnValue.shortValue(null))
case DateType =>
to.addColumnValue(ColumnValue.dateValue(null))
case TimestampType =>
to.addColumnValue(ColumnValue.timestampValue(null))
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.hive.thriftserver

import java.security.PrivilegedExceptionAction
import java.sql.Timestamp
import java.sql.{Date, Timestamp}
import java.util.concurrent.Future
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}

Expand Down Expand Up @@ -113,7 +113,7 @@ private[hive] class SparkExecuteStatementOperation(
def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) {
dataTypes(ordinal) match {
case StringType =>
to += from.get(ordinal).asInstanceOf[String]
to += from.getString(ordinal)
case IntegerType =>
to += from.getInt(ordinal)
case BooleanType =>
Expand All @@ -123,33 +123,30 @@ private[hive] class SparkExecuteStatementOperation(
case FloatType =>
to += from.getFloat(ordinal)
case DecimalType() =>
to += from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal
to += from.getAs[BigDecimal](ordinal).bigDecimal
case LongType =>
to += from.getLong(ordinal)
case ByteType =>
to += from.getByte(ordinal)
case ShortType =>
to += from.getShort(ordinal)
case DateType =>
to += from.getAs[Date](ordinal)
case TimestampType =>
to += from.get(ordinal).asInstanceOf[Timestamp]
case BinaryType =>
to += from.get(ordinal).asInstanceOf[String]
case _: ArrayType =>
to += from.get(ordinal).asInstanceOf[String]
case _: StructType =>
to += from.get(ordinal).asInstanceOf[String]
case _: MapType =>
to += from.get(ordinal).asInstanceOf[String]
to += from.getAs[Timestamp](ordinal)
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal)))
to += hiveString
}
}

def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
val reultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
if (!iter.hasNext) {
reultRowSet
resultRowSet
} else {
// maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
val maxRows = maxRowsL.toInt
Expand All @@ -166,10 +163,10 @@ private[hive] class SparkExecuteStatementOperation(
}
curCol += 1
}
reultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
curRow += 1
}
reultRowSet
resultRowSet
}
}

Expand Down
Loading

0 comments on commit cb6bd83

Please sign in to comment.