diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala index e1cbe6650aaaf..390837f608951 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala @@ -224,8 +224,9 @@ abstract class DataType { def json: String = compact(render(jsonValue)) def prettyJson: String = pretty(render(jsonValue)) -} + def toSimpleString: String = typeName +} /** * :: DeveloperApi :: @@ -235,8 +236,9 @@ abstract class DataType { * @group dataType */ @DeveloperApi -case object NullType extends DataType - +case object NullType extends DataType { + override def toSimpleString = "null" +} object NativeType { val all = Seq( @@ -300,6 +302,7 @@ case object StringType extends NativeType with PrimitiveType { private[sql] type JvmType = String @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] } private[sql] val ordering = implicitly[Ordering[JvmType]] + override def toSimpleString = "string" } @@ -324,6 +327,7 @@ case object BinaryType extends NativeType with PrimitiveType { x.length - y.length } } + override def toSimpleString = "binary" } @@ -339,6 +343,7 @@ case object BooleanType extends NativeType with PrimitiveType { private[sql] type JvmType = Boolean @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] } private[sql] val ordering = implicitly[Ordering[JvmType]] + override def toSimpleString = "boolean" } @@ -359,6 +364,7 @@ case object TimestampType extends NativeType { private[sql] val ordering = new Ordering[JvmType] { def compare(x: Timestamp, y: Timestamp) = x.compareTo(y) } + override def toSimpleString = "timestamp" } @@ -379,6 +385,7 @@ case object DateType extends NativeType { private[sql] val ordering = new Ordering[JvmType] { def compare(x: Date, y: Date) = x.compareTo(y) } + override def toSimpleString = "date" } @@ -425,6 +432,7 @@ case object LongType extends IntegralType { private[sql] val numeric = implicitly[Numeric[Long]] private[sql] val integral = implicitly[Integral[Long]] private[sql] val ordering = implicitly[Ordering[JvmType]] + override def toSimpleString = "bigint" } @@ -442,6 +450,7 @@ case object IntegerType extends IntegralType { private[sql] val numeric = implicitly[Numeric[Int]] private[sql] val integral = implicitly[Integral[Int]] private[sql] val ordering = implicitly[Ordering[JvmType]] + override def toSimpleString = "int" } @@ -459,6 +468,7 @@ case object ShortType extends IntegralType { private[sql] val numeric = implicitly[Numeric[Short]] private[sql] val integral = implicitly[Integral[Short]] private[sql] val ordering = implicitly[Ordering[JvmType]] + override def toSimpleString = "smallint" } @@ -476,6 +486,7 @@ case object ByteType extends IntegralType { private[sql] val numeric = implicitly[Numeric[Byte]] private[sql] val integral = implicitly[Integral[Byte]] private[sql] val ordering = implicitly[Ordering[JvmType]] + override def toSimpleString = "tinyint" } @@ -530,6 +541,11 @@ case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalT case Some(PrecisionInfo(precision, scale)) => s"DecimalType($precision,$scale)" case None => "DecimalType()" } + + override def toSimpleString = precisionInfo match { + case Some(PrecisionInfo(precision, scale)) => s"decimal($precision,$scale)" + case None => "decimal(10,0)" + } } @@ -580,6 +596,7 @@ case object DoubleType extends FractionalType { private[sql] val fractional = implicitly[Fractional[Double]] private[sql] val ordering = implicitly[Ordering[JvmType]] private[sql] val asIntegral = DoubleAsIfIntegral + override def toSimpleString = "double" } @@ -598,6 +615,7 @@ case object FloatType extends FractionalType { private[sql] val fractional = implicitly[Fractional[Float]] private[sql] val ordering = implicitly[Ordering[JvmType]] private[sql] val asIntegral = FloatAsIfIntegral + override def toSimpleString = "float" } @@ -636,6 +654,8 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT ("type" -> typeName) ~ ("elementType" -> elementType.jsonValue) ~ ("containsNull" -> containsNull) + + override def toSimpleString = s"array<${elementType.toSimpleString}>" } @@ -805,6 +825,11 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru override def length: Int = fields.length override def iterator: Iterator[StructField] = fields.iterator + + override def toSimpleString = { + val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.toSimpleString}") + s"struct<${fieldTypes.mkString(",")}>" + } } @@ -848,6 +873,8 @@ case class MapType( ("keyType" -> keyType.jsonValue) ~ ("valueType" -> valueType.jsonValue) ~ ("valueContainsNull" -> valueContainsNull) + + override def toSimpleString = s"map<${keyType.toSimpleString},${valueType.toSimpleString}>" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 52a31f01a4358..701b69aa3d05b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -21,10 +21,12 @@ import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SchemaRDD, SQLConf, SQLContext} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Row, Attribute} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import scala.collection.mutable.ArrayBuffer /** * A logical command that is executed for its side-effects. `RunnableCommand`s are @@ -178,3 +180,34 @@ case class DescribeCommand( child.output.map(field => Row(field.name, field.dataType.toString, null)) } } + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class DDLDescribeCommand( + dbName: Option[String], + tableName: String, isExtended: Boolean) extends RunnableCommand { + + override def run(sqlContext: SQLContext) = { + val tblRelation = dbName match { + case Some(db) => UnresolvedRelation(Seq(db, tableName)) + case None => UnresolvedRelation(Seq(tableName)) + } + val logicalRelation = sqlContext.executePlan(tblRelation).analyzed + val rows = new ArrayBuffer[Row]() + rows ++= logicalRelation.schema.fields.map{field => + Row(field.name, field.dataType.toSimpleString, null)} + + /* + * TODO if future support partition table, add header below: + * # Partition Information + * # col_name data_type comment + */ + if (isExtended) { // TODO describe extended table + // rows += Row("# extended", null, null) + } + rows + } +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 381298caba6f2..34c7a57187b76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -25,9 +25,10 @@ import org.apache.spark.Logging import org.apache.spark.sql.{SchemaRDD, SQLContext} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.SqlLexical -import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.execution.{DDLDescribeCommand, RunnableCommand} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation /** * A parser for foreign DDL commands. @@ -61,6 +62,8 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected val TABLE = Keyword("TABLE") protected val USING = Keyword("USING") protected val OPTIONS = Keyword("OPTIONS") + protected val DESCRIBE = Keyword("DESCRIBE") + protected val EXTENDED = Keyword("EXTENDED") // Data types. protected val STRING = Keyword("STRING") @@ -89,7 +92,7 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi override val lexical = new SqlLexical(reservedWords) - protected lazy val ddl: Parser[LogicalPlan] = createTable + protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable /** * `CREATE [TEMPORARY] TABLE avroTable @@ -112,6 +115,16 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")" + /* + * describe [extended] table avroTable + * This will display all columns of table `avroTable` includes column_name,column_type,nullable + */ + protected lazy val describeTable: Parser[LogicalPlan] = + (DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident ^^ { + case e ~ db ~ tbl => + DDLDescribeCommand(db, tbl, e.nonEmpty) + } + protected lazy val options: Parser[Map[String, String]] = "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuit.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuit.scala new file mode 100644 index 0000000000000..bb4ed902e7736 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuit.scala @@ -0,0 +1,120 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.sources + +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +class DDLScanSource extends RelationProvider { + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + SimpleDDLScan(parameters("from").toInt, parameters("TO").toInt)(sqlContext) + } +} + +case class SimpleDDLScan(from: Int, to: Int)(@transient val sqlContext: SQLContext) + extends TableScan { + + override def schema = + StructType(Seq( + StructField("intType", IntegerType, nullable = false), + StructField("stringType", StringType, nullable = false), + StructField("dateType", DateType, nullable = false), + StructField("timestampType", TimestampType, nullable = false), + StructField("doubleType", DoubleType, nullable = false), + StructField("bigintType", LongType, nullable = false), + StructField("tinyintType", ByteType, nullable = false), + StructField("decimalType", DecimalType.Unlimited, nullable = false), + StructField("fixedDecimalType", DecimalType(5,1), nullable = false), + StructField("binaryType", BinaryType, nullable = false), + StructField("booleanType", BooleanType, nullable = false), + StructField("smallIntType", ShortType, nullable = false), + StructField("floatType", FloatType, nullable = false), + StructField("mapType", MapType(StringType, StringType)), + StructField("arrayType", ArrayType(StringType)), + StructField("structType", + StructType(StructField("f1",StringType) :: + (StructField("f2",IntegerType)) :: Nil + ) + ) + )) + + + override def buildScan() = sqlContext.sparkContext.parallelize(from to to). + map(e => Row(s"people$e",e*2)) +} + +class DDLTestSuit extends DataSourceTest { + import caseInsensisitiveContext._ + + before { + sql( + """ + |CREATE TEMPORARY TABLE ddlPeople + |USING org.apache.spark.sql.sources.DDLScanSource + |OPTIONS ( + | From '1', + | To '10' + |) + """.stripMargin) + } + + sqlTest( + "describe ddlPeople", + Seq( + Row("intType", "int", null), + Row("stringType", "string", null), + Row("dateType", "date", null), + Row("timestampType", "timestamp", null), + Row("doubleType", "double", null), + Row("bigintType", "bigint", null), + Row("tinyintType", "tinyint", null), + Row("decimalType", "decimal(10,0)", null), + Row("fixedDecimalType", "decimal(5,1)", null), + Row("binaryType", "binary", null), + Row("booleanType", "boolean", null), + Row("smallIntType", "smallint", null), + Row("floatType", "float", null), + Row("mapType", "map", null), + Row("arrayType", "array", null), + Row("structType", "struct", null) + )) + + sqlTest( + "describe extended ddlPeople", + Seq( + Row("intType", "int", null), + Row("stringType", "string", null), + Row("dateType", "date", null), + Row("timestampType", "timestamp", null), + Row("doubleType", "double", null), + Row("bigintType", "bigint", null), + Row("tinyintType", "tinyint", null), + Row("decimalType", "decimal(10,0)", null), + Row("fixedDecimalType", "decimal(5,1)", null), + Row("binaryType", "binary", null), + Row("booleanType", "boolean", null), + Row("smallIntType", "smallint", null), + Row("floatType", "float", null), + Row("mapType", "map", null), + Row("arrayType", "array", null), + Row("structType", "struct", null) + // Row("# extended", null, null) + )) +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 4246b8b0918fd..f1618508ebeff 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperat import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand, QueryExecutionException} import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand} +import org.apache.spark.sql.hive.HiveQl.ParseException import org.apache.spark.sql.sources.DataSourceStrategy import org.apache.spark.sql.types._ @@ -70,8 +71,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { if (conf.dialect == "sql") { super.sql(sqlText) } else if (conf.dialect == "hiveql") { - new SchemaRDD(this, ddlParser(sqlText).getOrElse(HiveQl.parseSql(sqlText))) - } else { + val ddlPlan = ddlParser(sqlText) + val basicPlan = try { + HiveQl.parseSql(sqlText) + }catch { + case e: Exception if ddlPlan.nonEmpty => ddlPlan.get + case e: Throwable => throw e + } + new SchemaRDD(this, basicPlan) + } else { sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'") } }