diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index bba29b2bdca4d..23d12cbff3495 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -19,9 +19,10 @@ package org.apache.spark.sql.hive.thriftserver import java.io.File import java.net.ServerSocket -import java.sql.{DriverManager, Statement} +import java.sql.{Date, DriverManager, Statement} import java.util.concurrent.TimeoutException +import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.concurrent.{Await, Promise} @@ -51,6 +52,15 @@ import org.apache.spark.sql.hive.HiveShim class HiveThriftServer2Suite extends FunSuite with Logging { Class.forName(classOf[HiveDriver].getCanonicalName) + object TestData { + def getTestDataFilePath(name: String) = { + Thread.currentThread().getContextClassLoader.getResource(s"data/files/$name") + } + + val smallKv = getTestDataFilePath("small_kv.txt") + val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt") + } + def randomListeningPort = { // Let the system to choose a random available port to avoid collision with other parallel // builds. @@ -145,12 +155,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } } - val env = Seq( - // Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths - "SPARK_TESTING" -> "0", - // Prevents loading classes out of the assembly jar. Otherwise Utils.sparkVersion can't read - // proper version information from the jar manifest. - "SPARK_PREPEND_CLASSES" -> "") + // Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths + val env = Seq("SPARK_TESTING" -> "0") Process(command, None, env: _*).run(ProcessLogger( captureThriftServerOutput("stdout"), @@ -194,15 +200,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging { test("Test JDBC query execution") { withJdbcStatement() { statement => - val dataFilePath = - Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") - - val queries = - s"""SET spark.sql.shuffle.partitions=3; - |CREATE TABLE test(key INT, val STRING); - |LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test; - |CACHE TABLE test; - """.stripMargin.split(";").map(_.trim).filter(_.nonEmpty) + val queries = Seq( + "SET spark.sql.shuffle.partitions=3", + "DROP TABLE IF EXISTS test", + "CREATE TABLE test(key INT, val STRING)", + s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test", + "CACHE TABLE test") queries.foreach(statement.execute) @@ -216,14 +219,10 @@ class HiveThriftServer2Suite extends FunSuite with Logging { test("SPARK-3004 regression: result set containing NULL") { withJdbcStatement() { statement => - val dataFilePath = - Thread.currentThread().getContextClassLoader.getResource( - "data/files/small_kv_with_null.txt") - val queries = Seq( "DROP TABLE IF EXISTS test_null", "CREATE TABLE test_null(key INT, val STRING)", - s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test_null") + s"LOAD DATA LOCAL INPATH '${TestData.smallKvWithNull}' OVERWRITE INTO TABLE test_null") queries.foreach(statement.execute) @@ -270,13 +269,10 @@ class HiveThriftServer2Suite extends FunSuite with Logging { test("SPARK-4292 regression: result set iterator issue") { withJdbcStatement() { statement => - val dataFilePath = - Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") - val queries = Seq( "DROP TABLE IF EXISTS test_4292", "CREATE TABLE test_4292(key INT, val STRING)", - s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test_4292") + s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_4292") queries.foreach(statement.execute) @@ -284,10 +280,52 @@ class HiveThriftServer2Suite extends FunSuite with Logging { Seq(238, 86, 311, 27, 165).foreach { key => resultSet.next() - assert(resultSet.getInt(1) == key) + assert(resultSet.getInt(1) === key) } statement.executeQuery("DROP TABLE IF EXISTS test_4292") } } + + test("SPARK-4309 regression: Date type support") { + withJdbcStatement() { statement => + val queries = Seq( + "DROP TABLE IF EXISTS test_date", + "CREATE TABLE test_date(key INT, value STRING)", + s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_date") + + queries.foreach(statement.execute) + + assertResult(Date.valueOf("2011-01-01")) { + val resultSet = statement.executeQuery( + "SELECT CAST('2011-01-01' as date) FROM test_date LIMIT 1") + resultSet.next() + resultSet.getDate(1) + } + } + } + + test("SPARK-4407 regression: Complex type support") { + withJdbcStatement() { statement => + val queries = Seq( + "DROP TABLE IF EXISTS test_map", + "CREATE TABLE test_map(key INT, value STRING)", + s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map") + + queries.foreach(statement.execute) + + assertResult("""{238:"val_238"}""") { + val resultSet = statement.executeQuery("SELECT MAP(key, value) FROM test_map LIMIT 1") + resultSet.next() + resultSet.getString(1) + } + + assertResult("""["238","val_238"]""") { + val resultSet = statement.executeQuery( + "SELECT ARRAY(CAST(key AS STRING), value) FROM test_map LIMIT 1") + resultSet.next() + resultSet.getString(1) + } + } + } } diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala index aa2e3cab72bb9..9258ad0cdf1d0 100644 --- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala +++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.thriftserver -import java.sql.Timestamp +import java.sql.{Date, Timestamp} import java.util.{ArrayList => JArrayList, Map => JMap} import scala.collection.JavaConversions._ @@ -131,14 +131,13 @@ private[hive] class SparkExecuteStatementOperation( to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal))) case ShortType => to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal))) + case DateType => + to.addColumnValue(ColumnValue.dateValue(from(ordinal).asInstanceOf[Date])) case TimestampType => to.addColumnValue( ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp])) case BinaryType | _: ArrayType | _: StructType | _: MapType => - val hiveString = result - .queryExecution - .asInstanceOf[HiveContext#QueryExecution] - .toHiveString((from.get(ordinal), dataTypes(ordinal))) + val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal))) to.addColumnValue(ColumnValue.stringValue(hiveString)) } } @@ -163,6 +162,8 @@ private[hive] class SparkExecuteStatementOperation( to.addColumnValue(ColumnValue.byteValue(null)) case ShortType => to.addColumnValue(ColumnValue.shortValue(null)) + case DateType => + to.addColumnValue(ColumnValue.dateValue(null)) case TimestampType => to.addColumnValue(ColumnValue.timestampValue(null)) case BinaryType | _: ArrayType | _: StructType | _: MapType => diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala index a642478d08857..3c7f62af450d9 100644 --- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive.thriftserver import java.security.PrivilegedExceptionAction -import java.sql.Timestamp +import java.sql.{Date, Timestamp} import java.util.concurrent.Future import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} @@ -113,7 +113,7 @@ private[hive] class SparkExecuteStatementOperation( def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) { dataTypes(ordinal) match { case StringType => - to += from.get(ordinal).asInstanceOf[String] + to += from.getString(ordinal) case IntegerType => to += from.getInt(ordinal) case BooleanType => @@ -123,23 +123,20 @@ private[hive] class SparkExecuteStatementOperation( case FloatType => to += from.getFloat(ordinal) case DecimalType() => - to += from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal + to += from.getAs[BigDecimal](ordinal).bigDecimal case LongType => to += from.getLong(ordinal) case ByteType => to += from.getByte(ordinal) case ShortType => to += from.getShort(ordinal) + case DateType => + to += from.getAs[Date](ordinal) case TimestampType => - to += from.get(ordinal).asInstanceOf[Timestamp] - case BinaryType => - to += from.get(ordinal).asInstanceOf[String] - case _: ArrayType => - to += from.get(ordinal).asInstanceOf[String] - case _: StructType => - to += from.get(ordinal).asInstanceOf[String] - case _: MapType => - to += from.get(ordinal).asInstanceOf[String] + to += from.getAs[Timestamp](ordinal) + case BinaryType | _: ArrayType | _: StructType | _: MapType => + val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal))) + to += hiveString } } @@ -147,9 +144,9 @@ private[hive] class SparkExecuteStatementOperation( validateDefaultFetchOrientation(order) assertState(OperationState.FINISHED) setHasResultSet(true) - val reultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion) + val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion) if (!iter.hasNext) { - reultRowSet + resultRowSet } else { // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int val maxRows = maxRowsL.toInt @@ -166,10 +163,10 @@ private[hive] class SparkExecuteStatementOperation( } curCol += 1 } - reultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]]) + resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]]) curRow += 1 } - reultRowSet + resultRowSet } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index e88afaaf001c0..feed64fe4cd6f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -19,36 +19,27 @@ package org.apache.spark.sql.hive import java.io.{BufferedReader, File, InputStreamReader, PrintStream} import java.sql.{Date, Timestamp} -import java.util.{ArrayList => JArrayList} - -import org.apache.hadoop.hive.common.`type`.HiveDecimal -import org.apache.spark.sql.catalyst.types.DecimalType -import org.apache.spark.sql.catalyst.types.decimal.Decimal import scala.collection.JavaConversions._ import scala.language.implicitConversions -import scala.reflect.runtime.universe.{TypeTag, typeTag} +import scala.reflect.runtime.universe.TypeTag -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.Table import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hadoop.hive.serde2.io.TimestampWritable -import org.apache.hadoop.hive.serde2.io.DateWritable +import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators} -import org.apache.spark.sql.catalyst.analysis.{OverrideCatalog, OverrideFunctionRegistry} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators, OverrideCatalog, OverrideFunctionRegistry} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.ExtractPythonUdfs -import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.sql.execution.{Command => PhysicalCommand} +import org.apache.spark.sql.catalyst.types.DecimalType +import org.apache.spark.sql.catalyst.types.decimal.Decimal +import org.apache.spark.sql.execution.{ExtractPythonUdfs, QueryExecutionException, Command => PhysicalCommand} import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand import org.apache.spark.sql.sources.DataSourceStrategy @@ -136,7 +127,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { val relation = EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) relation match { - case relation: MetastoreRelation => { + case relation: MetastoreRelation => // This method is mainly based on // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table) // in Hive 0.13 (except that we do not use fs.getContentSummary). @@ -147,7 +138,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { // countFileSize to count the table size. def calculateTableSize(fs: FileSystem, path: Path): Long = { val fileStatus = fs.getFileStatus(path) - val size = if (fileStatus.isDir) { + val size = if (fileStatus.isDirectory) { fs.listStatus(path).map(status => calculateTableSize(fs, status.getPath)).sum } else { fileStatus.getLen @@ -157,7 +148,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } def getFileSizeForTable(conf: HiveConf, table: Table): Long = { - val path = table.getPath() + val path = table.getPath var size: Long = 0L try { val fs = path.getFileSystem(conf) @@ -187,15 +178,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { val hiveTTable = relation.hiveQlTable.getTTable hiveTTable.setParameters(tableParameters) val tableFullName = - relation.hiveQlTable.getDbName() + "." + relation.hiveQlTable.getTableName() + relation.hiveQlTable.getDbName + "." + relation.hiveQlTable.getTableName catalog.client.alterTable(tableFullName, new Table(hiveTTable)) } - } case otherRelation => throw new NotImplementedError( s"Analyze has only implemented for Hive tables, " + - s"but ${tableName} is a ${otherRelation.nodeName}") + s"but $tableName is a ${otherRelation.nodeName}") } } @@ -374,50 +364,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { /** Extends QueryExecution with hive specific features. */ protected[sql] abstract class QueryExecution extends super.QueryExecution { - protected val primitiveTypes = - Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType, - ShortType, DateType, TimestampType, BinaryType) - - protected[sql] def toHiveString(a: (Any, DataType)): String = a match { - case (struct: Row, StructType(fields)) => - struct.zip(fields).map { - case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}""" - }.mkString("{", ",", "}") - case (seq: Seq[_], ArrayType(typ, _)) => - seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]") - case (map: Map[_,_], MapType(kType, vType, _)) => - map.map { - case (key, value) => - toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType)) - }.toSeq.sorted.mkString("{", ",", "}") - case (null, _) => "NULL" - case (d: Date, DateType) => new DateWritable(d).toString - case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString - case (bin: Array[Byte], BinaryType) => new String(bin, "UTF-8") - case (decimal: Decimal, DecimalType()) => // Hive strips trailing zeros so use its toString - HiveShim.createDecimal(decimal.toBigDecimal.underlying()).toString - case (other, tpe) if primitiveTypes contains tpe => other.toString - } - - /** Hive outputs fields of structs slightly differently than top level attributes. */ - protected def toHiveStructString(a: (Any, DataType)): String = a match { - case (struct: Row, StructType(fields)) => - struct.zip(fields).map { - case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}""" - }.mkString("{", ",", "}") - case (seq: Seq[_], ArrayType(typ, _)) => - seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]") - case (map: Map[_, _], MapType(kType, vType, _)) => - map.map { - case (key, value) => - toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType)) - }.toSeq.sorted.mkString("{", ",", "}") - case (null, _) => "null" - case (s: String, StringType) => "\"" + s + "\"" - case (decimal, DecimalType()) => decimal.toString - case (other, tpe) if primitiveTypes contains tpe => other.toString - } - /** * Returns the result as a hive compatible sequence of strings. For native commands, the * execution is simply passed back to Hive. @@ -435,8 +381,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { // We need the types so we can output struct field names val types = analyzed.output.map(_.dataType) // Reformat to match hive tab delimited output. - val asString = result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t")).toSeq - asString + result.map(_.zip(types).map(HiveContext.toHiveString)).map(_.mkString("\t")).toSeq } override def simpleString: String = @@ -447,3 +392,49 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } } } + +object HiveContext { + protected val primitiveTypes = + Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType, + ShortType, DateType, TimestampType, BinaryType) + + protected[sql] def toHiveString(a: (Any, DataType)): String = a match { + case (struct: Row, StructType(fields)) => + struct.zip(fields).map { + case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}""" + }.mkString("{", ",", "}") + case (seq: Seq[_], ArrayType(typ, _)) => + seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]") + case (map: Map[_,_], MapType(kType, vType, _)) => + map.map { + case (key, value) => + toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType)) + }.toSeq.sorted.mkString("{", ",", "}") + case (null, _) => "NULL" + case (d: Date, DateType) => new DateWritable(d).toString + case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString + case (bin: Array[Byte], BinaryType) => new String(bin, "UTF-8") + case (decimal: Decimal, DecimalType()) => // Hive strips trailing zeros so use its toString + HiveShim.createDecimal(decimal.toBigDecimal.underlying()).toString + case (other, tpe) if primitiveTypes contains tpe => other.toString + } + + /** Hive outputs fields of structs slightly differently than top level attributes. */ + protected def toHiveStructString(a: (Any, DataType)): String = a match { + case (struct: Row, StructType(fields)) => + struct.zip(fields).map { + case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}""" + }.mkString("{", ",", "}") + case (seq: Seq[_], ArrayType(typ, _)) => + seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]") + case (map: Map[_, _], MapType(kType, vType, _)) => + map.map { + case (key, value) => + toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType)) + }.toSeq.sorted.mkString("{", ",", "}") + case (null, _) => "null" + case (s: String, StringType) => "\"" + s + "\"" + case (decimal, DecimalType()) => decimal.toString + case (other, tpe) if primitiveTypes contains tpe => other.toString + } +}