Skip to content

Commit

Permalink
[SPARK-5135][SQL] Add support for describe table to DDL in SQLContext
Browse files Browse the repository at this point in the history
Hi, rxin marmbrus
I considered your suggestion (in #4127) and now re-write it. This is now up-to-date.
Could u please review it ?

Author: OopsOutOfMemory <[email protected]>

Closes #4227 from OopsOutOfMemory/describe and squashes the following commits:

053826f [OopsOutOfMemory] describe
  • Loading branch information
OopsOutOfMemory authored and rxin committed Feb 5, 2015
1 parent a83936e commit 4d8d070
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,9 @@ abstract class DataType {
def json: String = compact(render(jsonValue))

def prettyJson: String = pretty(render(jsonValue))
}

def simpleString: String = typeName
}

/**
* :: DeveloperApi ::
Expand All @@ -242,7 +243,6 @@ case object NullType extends DataType {
override def defaultSize: Int = 1
}


protected[sql] object NativeType {
val all = Seq(
IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType)
Expand Down Expand Up @@ -448,6 +448,8 @@ case object LongType extends IntegralType {
* The default size of a value of the LongType is 8 bytes.
*/
override def defaultSize: Int = 8

override def simpleString = "bigint"
}


Expand All @@ -470,6 +472,8 @@ case object IntegerType extends IntegralType {
* The default size of a value of the IntegerType is 4 bytes.
*/
override def defaultSize: Int = 4

override def simpleString = "int"
}


Expand All @@ -492,6 +496,8 @@ case object ShortType extends IntegralType {
* The default size of a value of the ShortType is 2 bytes.
*/
override def defaultSize: Int = 2

override def simpleString = "smallint"
}


Expand All @@ -514,6 +520,8 @@ case object ByteType extends IntegralType {
* The default size of a value of the ByteType is 1 byte.
*/
override def defaultSize: Int = 1

override def simpleString = "tinyint"
}


Expand Down Expand Up @@ -573,6 +581,11 @@ case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalT
* The default size of a value of the DecimalType is 4096 bytes.
*/
override def defaultSize: Int = 4096

override def simpleString = precisionInfo match {
case Some(PrecisionInfo(precision, scale)) => s"decimal($precision,$scale)"
case None => "decimal(10,0)"
}
}


Expand Down Expand Up @@ -695,6 +708,8 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
* (We assume that there are 100 elements).
*/
override def defaultSize: Int = 100 * elementType.defaultSize

override def simpleString = s"array<${elementType.simpleString}>"
}


Expand Down Expand Up @@ -870,6 +885,11 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
* The default size of a value of the StructType is the total default sizes of all field types.
*/
override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum

override def simpleString = {
val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}")
s"struct<${fieldTypes.mkString(",")}>"
}
}


Expand Down Expand Up @@ -920,6 +940,8 @@ case class MapType(
* (We assume that there are 100 elements).
*/
override def defaultSize: Int = 100 * (keyType.defaultSize + valueType.defaultSize)

override def simpleString = s"map<${keyType.simpleString},${valueType.simpleString}>"
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.parquet._
import org.apache.spark.sql.types._
import org.apache.spark.sql.sources.{DescribeCommand => LogicalDescribeCommand}
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
import org.apache.spark.sql.sources._


private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SQLContext#SparkPlanner =>

Expand Down Expand Up @@ -337,6 +338,16 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case c: CreateTableUsingAsLogicalPlan if c.temporary && c.allowExisting =>
sys.error("allowExisting should be set to false when creating a temporary table.")

case LogicalDescribeCommand(table, isExtended) =>
val resultPlan = self.sqlContext.executePlan(table).executedPlan
ExecutedCommand(
RunnableDescribeCommand(resultPlan, resultPlan.output, isExtended)) :: Nil

case LogicalDescribeCommand(table, isExtended) =>
val resultPlan = self.sqlContext.executePlan(table).executedPlan
ExecutedCommand(
RunnableDescribeCommand(resultPlan, resultPlan.output, isExtended)) :: Nil

case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Row, Attribute}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import scala.collection.mutable.ArrayBuffer

/**
* A logical command that is executed for its side-effects. `RunnableCommand`s are
Expand Down Expand Up @@ -176,9 +177,14 @@ case class UncacheTableCommand(tableName: String) extends RunnableCommand {
@DeveloperApi
case class DescribeCommand(
child: SparkPlan,
override val output: Seq[Attribute]) extends RunnableCommand {
override val output: Seq[Attribute],
isExtended: Boolean) extends RunnableCommand {

override def run(sqlContext: SQLContext) = {
child.output.map(field => Row(field.name, field.dataType.toString, null))
child.schema.fields.map { field =>
val cmtKey = "comment"
val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else ""
Row(field.name, field.dataType.simpleString, comment)
}
}
}
39 changes: 37 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.apache.spark.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -50,7 +52,6 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
}
}


// Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
// properties via reflection the class in runtime for constructing the SqlLexical object
protected val CREATE = Keyword("CREATE")
Expand All @@ -61,6 +62,8 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
protected val EXISTS = Keyword("EXISTS")
protected val USING = Keyword("USING")
protected val OPTIONS = Keyword("OPTIONS")
protected val DESCRIBE = Keyword("DESCRIBE")
protected val EXTENDED = Keyword("EXTENDED")
protected val AS = Keyword("AS")
protected val COMMENT = Keyword("COMMENT")

Expand All @@ -82,7 +85,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
protected val MAP = Keyword("MAP")
protected val STRUCT = Keyword("STRUCT")

protected lazy val ddl: Parser[LogicalPlan] = createTable
protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable

protected def start: Parser[LogicalPlan] = ddl

Expand Down Expand Up @@ -136,6 +139,22 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {

protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"

/*
* describe [extended] table avroTable
* This will display all columns of table `avroTable` includes column_name,column_type,nullable
*/
protected lazy val describeTable: Parser[LogicalPlan] =
(DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident ^^ {
case e ~ db ~ tbl =>
val tblIdentifier = db match {
case Some(dbName) =>
Seq(dbName, tbl)
case None =>
Seq(tbl)
}
DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
}

protected lazy val options: Parser[Map[String, String]] =
"(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }

Expand Down Expand Up @@ -274,6 +293,22 @@ object ResolvedDataSource {

private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)

/**
* Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
* @param table The table to be described.
* @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
* It is effective only when the table is a Hive table.
*/
private[sql] case class DescribeCommand(
table: LogicalPlan,
isExtended: Boolean) extends Command {
override def output = Seq(
// Column names are based on Hive.
AttributeReference("col_name", StringType, nullable = false)(),
AttributeReference("data_type", StringType, nullable = false)(),
AttributeReference("comment", StringType, nullable = false)())
}

private[sql] case class CreateTableUsing(
tableName: String,
userSpecifiedSchema: Option[StructType],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources

import org.apache.spark.sql._
import org.apache.spark.sql.types._

class DDLScanSource extends RelationProvider {
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
SimpleDDLScan(parameters("from").toInt, parameters("TO").toInt)(sqlContext)
}
}

case class SimpleDDLScan(from: Int, to: Int)(@transient val sqlContext: SQLContext)
extends TableScan {

override def schema =
StructType(Seq(
StructField("intType", IntegerType, nullable = false,
new MetadataBuilder().putString("comment", "test comment").build()),
StructField("stringType", StringType, nullable = false),
StructField("dateType", DateType, nullable = false),
StructField("timestampType", TimestampType, nullable = false),
StructField("doubleType", DoubleType, nullable = false),
StructField("bigintType", LongType, nullable = false),
StructField("tinyintType", ByteType, nullable = false),
StructField("decimalType", DecimalType.Unlimited, nullable = false),
StructField("fixedDecimalType", DecimalType(5,1), nullable = false),
StructField("binaryType", BinaryType, nullable = false),
StructField("booleanType", BooleanType, nullable = false),
StructField("smallIntType", ShortType, nullable = false),
StructField("floatType", FloatType, nullable = false),
StructField("mapType", MapType(StringType, StringType)),
StructField("arrayType", ArrayType(StringType)),
StructField("structType",
StructType(StructField("f1",StringType) ::
(StructField("f2",IntegerType)) :: Nil
)
)
))


override def buildScan() = sqlContext.sparkContext.parallelize(from to to).
map(e => Row(s"people$e", e * 2))
}

class DDLTestSuite extends DataSourceTest {
import caseInsensisitiveContext._

before {
sql(
"""
|CREATE TEMPORARY TABLE ddlPeople
|USING org.apache.spark.sql.sources.DDLScanSource
|OPTIONS (
| From '1',
| To '10'
|)
""".stripMargin)
}

sqlTest(
"describe ddlPeople",
Seq(
Row("intType", "int", "test comment"),
Row("stringType", "string", ""),
Row("dateType", "date", ""),
Row("timestampType", "timestamp", ""),
Row("doubleType", "double", ""),
Row("bigintType", "bigint", ""),
Row("tinyintType", "tinyint", ""),
Row("decimalType", "decimal(10,0)", ""),
Row("fixedDecimalType", "decimal(5,1)", ""),
Row("binaryType", "binary", ""),
Row("booleanType", "boolean", ""),
Row("smallIntType", "smallint", ""),
Row("floatType", "float", ""),
Row("mapType", "map<string,string>", ""),
Row("arrayType", "array<string>", ""),
Row("structType", "struct<f1:string,f2:int>", "")
))
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
DataFrame(this,
ddlParser(sqlText, exceptionOnError = false).getOrElse(HiveQl.parseSql(substituted)))
} else {
sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'")
sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'")
}
}

Expand Down
17 changes: 1 addition & 16 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.ExplainCommand
import org.apache.spark.sql.sources.DescribeCommand
import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable, HiveScriptIOSchema}
import org.apache.spark.sql.types._

Expand All @@ -47,22 +48,6 @@ import scala.collection.JavaConversions._
*/
private[hive] case object NativePlaceholder extends Command

/**
* Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
* @param table The table to be described.
* @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
* It is effective only when the table is a Hive table.
*/
case class DescribeCommand(
table: LogicalPlan,
isExtended: Boolean) extends Command {
override def output = Seq(
// Column names are based on Hive.
AttributeReference("col_name", StringType, nullable = false)(),
AttributeReference("data_type", StringType, nullable = false)(),
AttributeReference("comment", StringType, nullable = false)())
}

/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
private[hive] object HiveQl {
protected val nativeCommands = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.sources.DescribeCommand
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.execution._
Expand Down Expand Up @@ -240,8 +241,11 @@ private[hive] trait HiveStrategies {
case t: MetastoreRelation =>
ExecutedCommand(
DescribeHiveTableCommand(t, describe.output, describe.isExtended)) :: Nil

case o: LogicalPlan =>
ExecutedCommand(RunnableDescribeCommand(planLater(o), describe.output)) :: Nil
val resultPlan = context.executePlan(o).executedPlan
ExecutedCommand(RunnableDescribeCommand(
resultPlan, describe.output, describe.isExtended)) :: Nil
}

case _ => Nil
Expand Down
Loading

0 comments on commit 4d8d070

Please sign in to comment.