Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7411] [SQL] Support SerDe for HiveQl in CTAS #5963

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -407,64 +407,60 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
* For example, because of a CREATE TABLE X AS statement.
*/
object CreateTables extends Rule[LogicalPlan] {
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.parse.{ASTNode, QB, SemanticAnalyzer}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p

case CreateTableAsSelect(desc, child, allowExisting) =>
if (hive.convertCTAS && !desc.serde.isDefined) {
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
// does not specify any storage format (file format and storage handler).
if (desc.specifiedDatabase.isDefined) {
throw new AnalysisException(
"Cannot specify database name in a CTAS statement " +
"when spark.sql.hive.convertCTAS is set to true.")
}

val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
desc.name,
conf.defaultDataSourceName,
temporary = false,
mode,
options = Map.empty[String, String],
child
)
case p: LogicalPlan if p.resolved => p
case p @ CreateTableAsSelect(table, child, allowExisting) =>
val schema = if (table.schema.size > 0) {
table.schema
} else {
execution.CreateTableAsSelect(
desc.copy(
specifiedDatabase = Option(desc.specifiedDatabase.getOrElse(client.currentDatabase))),
child,
allowExisting)
child.output.map {
attr => new HiveColumn(
attr.name,
HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
}
}

case p: LogicalPlan if p.resolved => p
val desc = table.copy(schema = schema)

case p @ CreateTableAsSelect(desc, child, allowExisting) =>
val (dbName, tblName) = processDatabaseAndTableName(desc.database, desc.name)
// This is a hack, we only take the RC, ORC and Parquet as specific storage
// otherwise, we will convert it into Parquet2 when hive.convertCTAS specified
val specificStorage = (table.inputFormat.map(format => {
// org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat => Parquet
// org.apache.hadoop.hive.ql.io.orc.OrcInputFormat => Orc
// org.apache.hadoop.hive.ql.io.RCFileInputFormat => RCFile
// parquet.hive.DeprecatedParquetInputFormat => Parquet
// TODO configurable?
format.contains("Orc") || format.contains("Parquet") || format.contains("RCFile")
}).getOrElse(false))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is really the logic we want. The goal here is to by default (i.e. if the user does not specify anything about storage), when convertCTAS is turned on to use the data sources API. Would it be possible to have the parser only fill in the storage options when the user specifies them and defer filling in default values until we are in the analyzer. That way we can distinguish "no storage options specified" from "default storage options chosen".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see, I will move the default SerDe from HiveQl to Analyzer


if (hive.convertCTAS) {
if (desc.specifiedDatabase.isDefined) {
if (hive.convertCTAS && !specificStorage) {
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
// does not specify any storage format (file format and storage handler).
if (table.specifiedDatabase.isDefined) {
throw new AnalysisException(
"Cannot specify database name in a CTAS statement " +
"when spark.sql.hive.convertCTAS is set to true.")
"when spark.sql.hive.convertCTAS is set to true.")
}

val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
tblName,
conf.defaultDataSourceName,
desc.name,
hive.conf.defaultDataSourceName,
temporary = false,
mode,
options = Map.empty[String, String],
child
)
} else {
val (dbName, tblName) =
processDatabaseAndTableName(
table.specifiedDatabase.getOrElse(client.currentDatabase), table.name)
execution.CreateTableAsSelect(
desc,
desc.copy(
specifiedDatabase = Some(dbName),
name = tblName),
child,
allowExisting)
}
Expand Down
209 changes: 175 additions & 34 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import java.sql.Date
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.ql.{ErrorMsg, Context}
import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, FunctionInfo}
import org.apache.hadoop.hive.ql.lib.Node
import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.hive.ql.parse._
import org.apache.hadoop.hive.ql.plan.PlanUtils
import org.apache.spark.sql.AnalysisException
import org.apache.hadoop.hive.ql.session.SessionState

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -62,7 +63,8 @@ case class CreateTableAsSelect(
allowExisting: Boolean) extends UnaryNode with Command {

override def output: Seq[Attribute] = Seq.empty[Attribute]
override lazy val resolved: Boolean = tableDesc.specifiedDatabase.isDefined && childrenResolved
override lazy val resolved: Boolean =
tableDesc.specifiedDatabase.isDefined && tableDesc.schema.size > 0 && childrenResolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps tableDesc should have a resolved that is only true when all options are filled in?

}

/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
Expand Down Expand Up @@ -240,12 +242,24 @@ private[hive] object HiveQl {
* Otherwise, there will be Null pointer exception,
* when retrieving properties form HiveConf.
*/
val hContext = new Context(new HiveConf())
val hContext = new Context(hiveConf)
val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext))
hContext.clear()
node
}

/**
* Returns the HiveConf
* TODO get it from HiveContext?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this implementation is reasonable and you can remove the TODO. HiveContext will fillin the SessionState.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we need to pass in the HiveContext object into the HiveQl, and get the HiveConf via code like hiveContext.hiveconf. Anyway I will remove the TODO for now.

*/
private[this] def hiveConf(): HiveConf = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nitin2goyal what do you think of this implementation? I think it will also give you the speed up you are looking for as long as you are parsing queries with a HiveContext. It also has the advantage of using and conf options that are SET by the user.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fine to me. Since it's already been done here, should I do just this change for 1.2 and 1.3 branches ?
Issue - https://issues.apache.org/jira/browse/SPARK-7331

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me.
On May 9, 2015 8:13 PM, "nitin2goyal" [email protected] wrote:

In sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
#5963 (comment):

 val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext))
 hContext.clear()
 node

}

  • /**
  • * Returns the HiveConf
  • * TODO get it from HiveContext?
  • */
  • private[this] def hiveConf(): HiveConf = {

Seems fine to me. Since it's already been done here, should I do just this
change for 1.2 and 1.3 branches ?
Issue - https://issues.apache.org/jira/browse/SPARK-7331


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/5963/files#r29998407.

val ss = SessionState.get() // SessionState is lazy initializaion, it can be null here
if (ss == null) {
new HiveConf()
} else {
ss.getConf
}
}

/** Returns a LogicalPlan for a given HiveQL string. */
def parseSql(sql: String): LogicalPlan = hqlParser.parse(sql)
Expand Down Expand Up @@ -476,8 +490,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
DropTable(tableName, ifExists.nonEmpty)
// Support "ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan"
case Token("TOK_ANALYZE",
Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) ::
isNoscan) =>
Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) ::
isNoscan) =>
// Reference:
// https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables
if (partitionSpec.nonEmpty) {
Expand Down Expand Up @@ -547,13 +561,15 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
val (
Some(tableNameParts) ::
_ /* likeTable */ ::
externalTable ::
Some(query) ::
allowExisting +:
ignores) =
getClauses(
Seq(
"TOK_TABNAME",
"TOK_LIKETABLE",
"EXTERNAL",
"TOK_QUERY",
"TOK_IFNOTEXISTS",
"TOK_TABLECOMMENT",
Expand All @@ -576,43 +592,155 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
children)
val (db, tableName) = extractDbNameTableName(tableNameParts)

var tableDesc =
HiveTable(
specifiedDatabase = db,
name = tableName,
schema = Seq.empty,
partitionColumns = Seq.empty,
properties = Map.empty,
serdeProperties = Map.empty,
tableType = ManagedTable,
location = None,
inputFormat = None,
outputFormat = None,
serde = None)

// TODO: Handle all the cases here...
children.foreach {
case Token("TOK_TBLRCFILE", Nil) =>
import org.apache.hadoop.hive.ql.io.{RCFileInputFormat, RCFileOutputFormat}
// TODO add bucket support
var tableDesc: HiveTable = HiveTable(
specifiedDatabase = db,
name = tableName,
schema = Seq.empty[HiveColumn],
partitionColumns = Seq.empty[HiveColumn],
properties = Map[String, String](),
serdeProperties = Map[String, String](),
tableType = if (externalTable.isDefined) ExternalTable else ManagedTable,
location = None,
inputFormat = None,
outputFormat = None,
serde = None,
viewText = None)

// default serde & input/output format
tableDesc = if ("SequenceFile".equalsIgnoreCase(
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps assign hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT) to a variable so the wrapping doesn't have to be so hard to follow.

tableDesc.copy(
inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat"))
} else if ("RCFile".equalsIgnoreCase(
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
tableDesc.copy(
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
serde = Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE)))
} else if ("ORC".equalsIgnoreCase(
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
tableDesc.copy(
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
} else if ("PARQUET".equalsIgnoreCase(
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
tableDesc.copy(
inputFormat =
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
outputFormat =
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
serde =
Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
} else {
tableDesc.copy(
inputFormat =
Option("org.apache.hadoop.mapred.TextInputFormat"),
outputFormat =
Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
}

children.collect {
case list @ Token("TOK_TABCOLLIST", _) =>
val cols = BaseSemanticAnalyzer.getColumns(list, true)
if (cols != null) {
tableDesc = tableDesc.copy(
schema = cols.map { field =>
HiveColumn(field.getName, field.getType, field.getComment)
})
}
case Token("TOK_TABLECOMMENT", child :: Nil) =>
val comment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
// TODO support the sql text
tableDesc = tableDesc.copy(viewText = Option(comment))
case Token("TOK_TABLEPARTCOLS", list @ Token("TOK_TABCOLLIST", _) :: Nil) =>
val cols = BaseSemanticAnalyzer.getColumns(list(0), false)
if (cols != null) {
tableDesc = tableDesc.copy(
partitionColumns = cols.map { field =>
HiveColumn(field.getName, field.getType, field.getComment)
})
}
case Token("TOK_TABLEROWFORMAT", Token("TOK_SERDEPROPS", child :: Nil) :: Nil)=>
val serdeParams = new java.util.HashMap[String, String]()
child match {
case Token("TOK_TABLEROWFORMATFIELD", rowChild1 :: rowChild2) =>
val fieldDelim = BaseSemanticAnalyzer.unescapeSQLString (rowChild1.getText())
serdeParams.put(serdeConstants.FIELD_DELIM, fieldDelim)
serdeParams.put(serdeConstants.SERIALIZATION_FORMAT, fieldDelim)
if (rowChild2.length > 1) {
val fieldEscape = BaseSemanticAnalyzer.unescapeSQLString (rowChild2(0).getText)
serdeParams.put(serdeConstants.ESCAPE_CHAR, fieldEscape)
}
case Token("TOK_TABLEROWFORMATCOLLITEMS", rowChild :: Nil) =>
val collItemDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
serdeParams.put(serdeConstants.COLLECTION_DELIM, collItemDelim)
case Token("TOK_TABLEROWFORMATMAPKEYS", rowChild :: Nil) =>
val mapKeyDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
serdeParams.put(serdeConstants.MAPKEY_DELIM, mapKeyDelim)
case Token("TOK_TABLEROWFORMATLINES", rowChild :: Nil) =>
val lineDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
if (!(lineDelim == "\n") && !(lineDelim == "10")) {
throw new AnalysisException(
SemanticAnalyzer.generateErrorMessage(
rowChild,
ErrorMsg.LINES_TERMINATED_BY_NON_NEWLINE.getMsg))
}
serdeParams.put(serdeConstants.LINE_DELIM, lineDelim)
case Token("TOK_TABLEROWFORMATNULL", rowChild :: Nil) =>
val nullFormat = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
// TODO support the nullFormat
case _ => assert(false)
}
tableDesc = tableDesc.copy(
serdeProperties = tableDesc.serdeProperties ++ serdeParams)
case Token("TOK_TABLELOCATION", child :: Nil) =>
var location = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
location = EximUtil.relativeToAbsolutePath(hiveConf, location)
tableDesc = tableDesc.copy(location = Option(location))
case Token("TOK_TABLESERIALIZER", child :: Nil) =>
tableDesc = tableDesc.copy(
outputFormat = Option(classOf[RCFileOutputFormat].getName),
inputFormat = Option(classOf[RCFileInputFormat[_, _]].getName))
serde = Option(BaseSemanticAnalyzer.unescapeSQLString(child.getChild(0).getText)))
if (child.getChildCount == 2) {
val serdeParams = new java.util.HashMap[String, String]()
BaseSemanticAnalyzer.readProps(
(child.getChild(1).getChild(0)).asInstanceOf[ASTNode], serdeParams)
tableDesc = tableDesc.copy(serdeProperties = tableDesc.serdeProperties ++ serdeParams)
}
case Token("TOK_FILEFORMAT_GENERIC", child :: Nil) =>
throw new SemanticException(
"Unrecognized file format in STORED AS clause:${child.getText}")

case Token("TOK_TBLRCFILE", Nil) =>
tableDesc = tableDesc.copy(
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
if (tableDesc.serde.isEmpty) {
tableDesc = tableDesc.copy(
serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
}

case Token("TOK_TBLORCFILE", Nil) =>
tableDesc = tableDesc.copy(
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
if (tableDesc.serde.isEmpty) {
tableDesc = tableDesc.copy(
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
}

case Token("TOK_TBLPARQUETFILE", Nil) =>
tableDesc = tableDesc.copy(
inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
inputFormat =
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
outputFormat =
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
if (tableDesc.serde.isEmpty) {
tableDesc = tableDesc.copy(
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
}

case Token("TOK_TABLESERIALIZER",
Token("TOK_SERDENAME", Token(serdeName, Nil) :: otherProps) :: Nil) =>
Expand All @@ -627,13 +755,26 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C

case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
tableDesc = tableDesc.copy(properties = tableDesc.properties ++ getProperties(list))
case list @ Token("TOK_TABLEFILEFORMAT", _) =>
tableDesc = tableDesc.copy(
inputFormat =
Option(BaseSemanticAnalyzer.unescapeSQLString(list.getChild(0).getText)),
outputFormat =
Option(BaseSemanticAnalyzer.unescapeSQLString(list.getChild(1).getText)))
case Token("TOK_STORAGEHANDLER", _) =>
throw new AnalysisException(ErrorMsg.CREATE_NON_NATIVE_AS.getMsg())
case _ => // Unsupport features
}

case _ =>
if (tableDesc.serde.isEmpty) {
// add default serde
tableDesc = tableDesc.copy(
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
}

CreateTableAsSelect(tableDesc, nodeToPlan(query), allowExisting != None)

// If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
// If its not a "CTAS" like above then take it as a native command
case Token("TOK_CREATETABLE", _) => NativePlaceholder

// Support "TRUNCATE TABLE table_name [PARTITION partition_spec]"
Expand Down
Loading