Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7411] [SQL] Support SerDe for HiveQl in CTAS #5963

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -407,64 +407,58 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
* For example, because of a CREATE TABLE X AS statement.
*/
object CreateTables extends Rule[LogicalPlan] {
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.parse.{ASTNode, QB, SemanticAnalyzer}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p
case p: LogicalPlan if p.resolved => p
case p @ CreateTableAsSelect(table, child, allowExisting) =>
val schema = if (table.schema.size > 0) {
table.schema
} else {
child.output.map {
attr => new HiveColumn(
attr.name,
HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
}
}

val desc = table.copy(schema = schema)

case CreateTableAsSelect(desc, child, allowExisting) =>
if (hive.convertCTAS && !desc.serde.isDefined) {
if (hive.convertCTAS && table.serde.isEmpty) {
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
// does not specify any storage format (file format and storage handler).
if (desc.specifiedDatabase.isDefined) {
if (table.specifiedDatabase.isDefined) {
throw new AnalysisException(
"Cannot specify database name in a CTAS statement " +
"when spark.sql.hive.convertCTAS is set to true.")
"when spark.sql.hive.convertCTAS is set to true.")
}

val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
desc.name,
conf.defaultDataSourceName,
hive.conf.defaultDataSourceName,
temporary = false,
mode,
options = Map.empty[String, String],
child
)
} else {
execution.CreateTableAsSelect(
desc.copy(
specifiedDatabase = Option(desc.specifiedDatabase.getOrElse(client.currentDatabase))),
child,
allowExisting)
}

case p: LogicalPlan if p.resolved => p

case p @ CreateTableAsSelect(desc, child, allowExisting) =>
val (dbName, tblName) = processDatabaseAndTableName(desc.database, desc.name)

if (hive.convertCTAS) {
if (desc.specifiedDatabase.isDefined) {
throw new AnalysisException(
"Cannot specify database name in a CTAS statement " +
"when spark.sql.hive.convertCTAS is set to true.")
val desc = if (table.serde.isEmpty) {
// add default serde
table.copy(
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
} else {
table
}

val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
tblName,
conf.defaultDataSourceName,
temporary = false,
mode,
options = Map.empty[String, String],
child
)
} else {
val (dbName, tblName) =
processDatabaseAndTableName(
desc.specifiedDatabase.getOrElse(client.currentDatabase), desc.name)

execution.CreateTableAsSelect(
desc,
desc.copy(
specifiedDatabase = Some(dbName),
name = tblName),
child,
allowExisting)
}
Expand Down
207 changes: 172 additions & 35 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import java.sql.Date
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.ql.{ErrorMsg, Context}
import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, FunctionInfo}
import org.apache.hadoop.hive.ql.lib.Node
import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.hive.ql.parse._
import org.apache.hadoop.hive.ql.plan.PlanUtils
import org.apache.spark.sql.AnalysisException
import org.apache.hadoop.hive.ql.session.SessionState

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -62,7 +63,13 @@ case class CreateTableAsSelect(
allowExisting: Boolean) extends UnaryNode with Command {

override def output: Seq[Attribute] = Seq.empty[Attribute]
override lazy val resolved: Boolean = tableDesc.specifiedDatabase.isDefined && childrenResolved
override lazy val resolved: Boolean =
tableDesc.specifiedDatabase.isDefined &&
tableDesc.schema.size > 0 &&
tableDesc.serde.isDefined &&
tableDesc.inputFormat.isDefined &&
tableDesc.outputFormat.isDefined &&
childrenResolved
}

/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
Expand Down Expand Up @@ -240,12 +247,23 @@ private[hive] object HiveQl {
* Otherwise, there will be Null pointer exception,
* when retrieving properties form HiveConf.
*/
val hContext = new Context(new HiveConf())
val hContext = new Context(hiveConf)
val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext))
hContext.clear()
node
}

/**
* Returns the HiveConf
*/
private[this] def hiveConf(): HiveConf = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nitin2goyal what do you think of this implementation? I think it will also give you the speed up you are looking for as long as you are parsing queries with a HiveContext. It also has the advantage of using and conf options that are SET by the user.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fine to me. Since it's already been done here, should I do just this change for 1.2 and 1.3 branches ?
Issue - https://issues.apache.org/jira/browse/SPARK-7331

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me.
On May 9, 2015 8:13 PM, "nitin2goyal" [email protected] wrote:

In sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
#5963 (comment):

 val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext))
 hContext.clear()
 node

}

  • /**
  • * Returns the HiveConf
  • * TODO get it from HiveContext?
  • */
  • private[this] def hiveConf(): HiveConf = {

Seems fine to me. Since it's already been done here, should I do just this
change for 1.2 and 1.3 branches ?
Issue - https://issues.apache.org/jira/browse/SPARK-7331


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/5963/files#r29998407.

val ss = SessionState.get() // SessionState is lazy initializaion, it can be null here
if (ss == null) {
new HiveConf()
} else {
ss.getConf
}
}

/** Returns a LogicalPlan for a given HiveQL string. */
def parseSql(sql: String): LogicalPlan = hqlParser.parse(sql)
Expand Down Expand Up @@ -476,8 +494,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
DropTable(tableName, ifExists.nonEmpty)
// Support "ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan"
case Token("TOK_ANALYZE",
Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) ::
isNoscan) =>
Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) ::
isNoscan) =>
// Reference:
// https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables
if (partitionSpec.nonEmpty) {
Expand Down Expand Up @@ -547,13 +565,15 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
val (
Some(tableNameParts) ::
_ /* likeTable */ ::
externalTable ::
Some(query) ::
allowExisting +:
ignores) =
getClauses(
Seq(
"TOK_TABNAME",
"TOK_LIKETABLE",
"EXTERNAL",
"TOK_QUERY",
"TOK_IFNOTEXISTS",
"TOK_TABLECOMMENT",
Expand All @@ -576,43 +596,153 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
children)
val (db, tableName) = extractDbNameTableName(tableNameParts)

var tableDesc =
HiveTable(
specifiedDatabase = db,
name = tableName,
schema = Seq.empty,
partitionColumns = Seq.empty,
properties = Map.empty,
serdeProperties = Map.empty,
tableType = ManagedTable,
location = None,
inputFormat = None,
outputFormat = None,
serde = None)

// TODO: Handle all the cases here...
children.foreach {
case Token("TOK_TBLRCFILE", Nil) =>
import org.apache.hadoop.hive.ql.io.{RCFileInputFormat, RCFileOutputFormat}
// TODO add bucket support
var tableDesc: HiveTable = HiveTable(
specifiedDatabase = db,
name = tableName,
schema = Seq.empty[HiveColumn],
partitionColumns = Seq.empty[HiveColumn],
properties = Map[String, String](),
serdeProperties = Map[String, String](),
tableType = if (externalTable.isDefined) ExternalTable else ManagedTable,
location = None,
inputFormat = None,
outputFormat = None,
serde = None,
viewText = None)

// default storage type abbriviation (e.g. RCFile, ORC, PARQUET etc.)
val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT)
// handle the default format for the storage type abbriviation
tableDesc = if ("SequenceFile".equalsIgnoreCase(defaultStorageType)) {
tableDesc.copy(
inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat"))
} else if ("RCFile".equalsIgnoreCase(defaultStorageType)) {
tableDesc.copy(
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
serde = Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE)))
} else if ("ORC".equalsIgnoreCase(defaultStorageType)) {
tableDesc.copy(
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
} else if ("PARQUET".equalsIgnoreCase(defaultStorageType)) {
tableDesc.copy(
inputFormat =
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
outputFormat =
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
serde =
Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
} else {
tableDesc.copy(
inputFormat =
Option("org.apache.hadoop.mapred.TextInputFormat"),
outputFormat =
Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
}

children.collect {
case list @ Token("TOK_TABCOLLIST", _) =>
val cols = BaseSemanticAnalyzer.getColumns(list, true)
if (cols != null) {
tableDesc = tableDesc.copy(
schema = cols.map { field =>
HiveColumn(field.getName, field.getType, field.getComment)
})
}
case Token("TOK_TABLECOMMENT", child :: Nil) =>
val comment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
// TODO support the sql text
tableDesc = tableDesc.copy(viewText = Option(comment))
case Token("TOK_TABLEPARTCOLS", list @ Token("TOK_TABCOLLIST", _) :: Nil) =>
val cols = BaseSemanticAnalyzer.getColumns(list(0), false)
if (cols != null) {
tableDesc = tableDesc.copy(
partitionColumns = cols.map { field =>
HiveColumn(field.getName, field.getType, field.getComment)
})
}
case Token("TOK_TABLEROWFORMAT", Token("TOK_SERDEPROPS", child :: Nil) :: Nil)=>
val serdeParams = new java.util.HashMap[String, String]()
child match {
case Token("TOK_TABLEROWFORMATFIELD", rowChild1 :: rowChild2) =>
val fieldDelim = BaseSemanticAnalyzer.unescapeSQLString (rowChild1.getText())
serdeParams.put(serdeConstants.FIELD_DELIM, fieldDelim)
serdeParams.put(serdeConstants.SERIALIZATION_FORMAT, fieldDelim)
if (rowChild2.length > 1) {
val fieldEscape = BaseSemanticAnalyzer.unescapeSQLString (rowChild2(0).getText)
serdeParams.put(serdeConstants.ESCAPE_CHAR, fieldEscape)
}
case Token("TOK_TABLEROWFORMATCOLLITEMS", rowChild :: Nil) =>
val collItemDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
serdeParams.put(serdeConstants.COLLECTION_DELIM, collItemDelim)
case Token("TOK_TABLEROWFORMATMAPKEYS", rowChild :: Nil) =>
val mapKeyDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
serdeParams.put(serdeConstants.MAPKEY_DELIM, mapKeyDelim)
case Token("TOK_TABLEROWFORMATLINES", rowChild :: Nil) =>
val lineDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
if (!(lineDelim == "\n") && !(lineDelim == "10")) {
throw new AnalysisException(
SemanticAnalyzer.generateErrorMessage(
rowChild,
ErrorMsg.LINES_TERMINATED_BY_NON_NEWLINE.getMsg))
}
serdeParams.put(serdeConstants.LINE_DELIM, lineDelim)
case Token("TOK_TABLEROWFORMATNULL", rowChild :: Nil) =>
val nullFormat = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
// TODO support the nullFormat
case _ => assert(false)
}
tableDesc = tableDesc.copy(
serdeProperties = tableDesc.serdeProperties ++ serdeParams)
case Token("TOK_TABLELOCATION", child :: Nil) =>
var location = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
location = EximUtil.relativeToAbsolutePath(hiveConf, location)
tableDesc = tableDesc.copy(location = Option(location))
case Token("TOK_TABLESERIALIZER", child :: Nil) =>
tableDesc = tableDesc.copy(
outputFormat = Option(classOf[RCFileOutputFormat].getName),
inputFormat = Option(classOf[RCFileInputFormat[_, _]].getName))
serde = Option(BaseSemanticAnalyzer.unescapeSQLString(child.getChild(0).getText)))
if (child.getChildCount == 2) {
val serdeParams = new java.util.HashMap[String, String]()
BaseSemanticAnalyzer.readProps(
(child.getChild(1).getChild(0)).asInstanceOf[ASTNode], serdeParams)
tableDesc = tableDesc.copy(serdeProperties = tableDesc.serdeProperties ++ serdeParams)
}
case Token("TOK_FILEFORMAT_GENERIC", child :: Nil) =>
throw new SemanticException(
"Unrecognized file format in STORED AS clause:${child.getText}")

case Token("TOK_TBLRCFILE", Nil) =>
tableDesc = tableDesc.copy(
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
if (tableDesc.serde.isEmpty) {
tableDesc = tableDesc.copy(
serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
}

case Token("TOK_TBLORCFILE", Nil) =>
tableDesc = tableDesc.copy(
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
if (tableDesc.serde.isEmpty) {
tableDesc = tableDesc.copy(
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
}

case Token("TOK_TBLPARQUETFILE", Nil) =>
tableDesc = tableDesc.copy(
inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
inputFormat =
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
outputFormat =
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
if (tableDesc.serde.isEmpty) {
tableDesc = tableDesc.copy(
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
}

case Token("TOK_TABLESERIALIZER",
Token("TOK_SERDENAME", Token(serdeName, Nil) :: otherProps) :: Nil) =>
Expand All @@ -627,13 +757,20 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C

case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
tableDesc = tableDesc.copy(properties = tableDesc.properties ++ getProperties(list))

case _ =>
case list @ Token("TOK_TABLEFILEFORMAT", _) =>
tableDesc = tableDesc.copy(
inputFormat =
Option(BaseSemanticAnalyzer.unescapeSQLString(list.getChild(0).getText)),
outputFormat =
Option(BaseSemanticAnalyzer.unescapeSQLString(list.getChild(1).getText)))
case Token("TOK_STORAGEHANDLER", _) =>
throw new AnalysisException(ErrorMsg.CREATE_NON_NATIVE_AS.getMsg())
case _ => // Unsupport features
}

CreateTableAsSelect(tableDesc, nodeToPlan(query), allowExisting != None)

// If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
// If its not a "CTAS" like above then take it as a native command
case Token("TOK_CREATETABLE", _) => NativePlaceholder

// Support "TRUNCATE TABLE table_name [PARTITION partition_spec]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ private[hive] class ClientWrapper(
table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)))
table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) }
table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) }

// set owner
qlTable.setOwner(conf.getUser)
// set create time
qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])

version match {
case hive.v12 =>
table.location.map(new URI(_)).foreach(u => qlTable.call[URI, Unit]("setDataLocation", u))
Expand Down
Loading