Skip to content

Commit

Permalink
Update code as feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
chenghao-intel committed May 10, 2015
1 parent f4e243f commit a8260e8
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,18 +424,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive

val desc = table.copy(schema = schema)

// This is a hack, we only take the RC, ORC and Parquet as specific storage
// otherwise, we will convert it into Parquet2 when hive.convertCTAS specified
val specificStorage = (table.inputFormat.map(format => {
// org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat => Parquet
// org.apache.hadoop.hive.ql.io.orc.OrcInputFormat => Orc
// org.apache.hadoop.hive.ql.io.RCFileInputFormat => RCFile
// parquet.hive.DeprecatedParquetInputFormat => Parquet
// TODO configurable?
format.contains("Orc") || format.contains("Parquet") || format.contains("RCFile")
}).getOrElse(false))

if (hive.convertCTAS && !specificStorage) {
if (hive.convertCTAS && table.serde.isEmpty) {
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
// does not specify any storage format (file format and storage handler).
if (table.specifiedDatabase.isDefined) {
Expand All @@ -454,9 +443,18 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
child
)
} else {
val desc = if (table.serde.isEmpty) {
// add default serde
table.copy(
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
} else {
table
}

val (dbName, tblName) =
processDatabaseAndTableName(
table.specifiedDatabase.getOrElse(client.currentDatabase), table.name)
desc.specifiedDatabase.getOrElse(client.currentDatabase), desc.name)

execution.CreateTableAsSelect(
desc.copy(
specifiedDatabase = Some(dbName),
Expand Down
28 changes: 12 additions & 16 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ case class CreateTableAsSelect(

override def output: Seq[Attribute] = Seq.empty[Attribute]
override lazy val resolved: Boolean =
tableDesc.specifiedDatabase.isDefined && tableDesc.schema.size > 0 && childrenResolved
// TODO add more condition?
tableDesc.specifiedDatabase.isDefined &&
tableDesc.schema.size > 0 &&
tableDesc.serde.isDefined &&
childrenResolved
}

/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
Expand Down Expand Up @@ -607,26 +611,24 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
serde = None,
viewText = None)

// default serde & input/output format
tableDesc = if ("SequenceFile".equalsIgnoreCase(
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
// default storage type abbriviation (e.g. RCFile, ORC, PARQUET etc.)
val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT)
// handle the default format for the storage type abbriviation
tableDesc = if ("SequenceFile".equalsIgnoreCase(defaultStorageType)) {
tableDesc.copy(
inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat"))
} else if ("RCFile".equalsIgnoreCase(
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
} else if ("RCFile".equalsIgnoreCase(defaultStorageType)) {
tableDesc.copy(
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
serde = Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE)))
} else if ("ORC".equalsIgnoreCase(
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
} else if ("ORC".equalsIgnoreCase(defaultStorageType)) {
tableDesc.copy(
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
} else if ("PARQUET".equalsIgnoreCase(
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
} else if ("PARQUET".equalsIgnoreCase(defaultStorageType)) {
tableDesc.copy(
inputFormat =
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
Expand Down Expand Up @@ -766,12 +768,6 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
case _ => // Unsupport features
}

if (tableDesc.serde.isEmpty) {
// add default serde
tableDesc = tableDesc.copy(
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
}

CreateTableAsSelect(tableDesc, nodeToPlan(query), allowExisting != None)

// If its not a "CTAS" like above then take it as a native command
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class HiveQlSuite extends FunSuite with BeforeAndAfterAll {
assert(desc.serdeProperties == Map())
assert(desc.inputFormat == Option("org.apache.hadoop.mapred.TextInputFormat"))
assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
assert(desc.serde == Option("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
assert(desc.serde.isEmpty)
assert(desc.properties == Map())
}

Expand Down

0 comments on commit a8260e8

Please sign in to comment.